build: Moved user and group management commands and unit tests to edx-django-utils
- Removed manage_user and manage_group commands and their unit tests from edx-platform and added then to edx-django-utils. - Modified User.post_save signal to ensure the user profile is created when manage_user management command is run to create a user. - Added edx-django-utils to INSTALLED_APPS for LMS and Studio. - Moved generate_password from openedx.core.djangoapps.user_authn.utils to edx_django_utils.user along with its unit test.
This commit is contained in:
@@ -1620,6 +1620,9 @@ INSTALLED_APPS = [
|
||||
# Database-backed Organizations App (http://github.com/edx/edx-organizations)
|
||||
'organizations',
|
||||
|
||||
# User and group management via edx-django-utils
|
||||
'edx_django_utils.user',
|
||||
|
||||
# Allow Studio to use LMS for SSO
|
||||
'social_django',
|
||||
]
|
||||
|
||||
@@ -1,126 +0,0 @@
|
||||
"""
|
||||
Management command `manage_group` is used to idempotently create Django groups
|
||||
and set their permissions by name.
|
||||
"""
|
||||
|
||||
|
||||
from django.apps import apps
|
||||
from django.contrib.auth.models import Group, Permission
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from django.db import transaction
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
|
||||
class Command(BaseCommand): # lint-amnesty, pylint: disable=missing-class-docstring
|
||||
help = 'Creates the specified group, if it does not exist, and sets its permissions.'
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('group_name')
|
||||
parser.add_argument('--remove', dest='is_remove', action='store_true')
|
||||
parser.add_argument('-p', '--permissions', nargs='*', default=[])
|
||||
|
||||
def _handle_remove(self, group_name): # lint-amnesty, pylint: disable=missing-function-docstring
|
||||
|
||||
try:
|
||||
Group.objects.get(name=group_name).delete()
|
||||
self.stderr.write(_('Removed group: "{}"').format(group_name))
|
||||
except Group.DoesNotExist:
|
||||
self.stderr.write(_('Did not find a group with name "{}" - skipping.').format(group_name))
|
||||
|
||||
@transaction.atomic
|
||||
def handle(self, group_name, is_remove, permissions=None, *args, **options): # lint-amnesty, pylint: disable=arguments-differ, keyword-arg-before-vararg
|
||||
|
||||
if is_remove:
|
||||
self._handle_remove(group_name)
|
||||
return
|
||||
|
||||
old_permissions = set()
|
||||
group, created = Group.objects.get_or_create(name=group_name)
|
||||
|
||||
if created:
|
||||
try:
|
||||
# Needed for sqlite backend (i.e. in tests) because
|
||||
# name.max_length won't be enforced by the db.
|
||||
# See also http://www.sqlite.org/faq.html#q9
|
||||
group.full_clean()
|
||||
except ValidationError as exc:
|
||||
# give a more helpful error
|
||||
raise CommandError( # lint-amnesty, pylint: disable=raise-missing-from
|
||||
_(
|
||||
'Invalid group name: "{group_name}". {messages}'
|
||||
).format(
|
||||
group_name=group_name,
|
||||
messages=exc.messages[0]
|
||||
)
|
||||
)
|
||||
self.stderr.write(_('Created new group: "{}"').format(group_name))
|
||||
else:
|
||||
self.stderr.write(_('Found existing group: "{}"').format(group_name))
|
||||
old_permissions = set(group.permissions.all())
|
||||
|
||||
new_permissions = self._resolve_permissions(permissions or set())
|
||||
|
||||
add_permissions = new_permissions - old_permissions
|
||||
remove_permissions = old_permissions - new_permissions
|
||||
|
||||
self.stderr.write(
|
||||
_(
|
||||
'Adding {codenames} permissions to group "{group}"'
|
||||
).format(
|
||||
codenames=[ap.name for ap in add_permissions],
|
||||
group=group.name
|
||||
)
|
||||
)
|
||||
self.stderr.write(
|
||||
_(
|
||||
'Removing {codenames} permissions from group "{group}"'
|
||||
).format(
|
||||
codenames=[rp.codename for rp in remove_permissions],
|
||||
group=group.name
|
||||
)
|
||||
)
|
||||
|
||||
group.permissions.set(new_permissions)
|
||||
|
||||
group.save()
|
||||
|
||||
def _resolve_permissions(self, permissions): # lint-amnesty, pylint: disable=missing-function-docstring
|
||||
new_permissions = set()
|
||||
for permission in permissions:
|
||||
try:
|
||||
app_label, model_name, codename = permission.split(':')
|
||||
except ValueError:
|
||||
# give a more helpful error
|
||||
raise CommandError(_( # lint-amnesty, pylint: disable=raise-missing-from
|
||||
'Invalid permission option: "{}". Please specify permissions '
|
||||
'using the format: app_label:model_name:permission_codename.'
|
||||
).format(permission))
|
||||
# this will raise a LookupError if it fails.
|
||||
try:
|
||||
model_class = apps.get_model(app_label, model_name)
|
||||
except LookupError as exc:
|
||||
raise CommandError(str(exc)) # lint-amnesty, pylint: disable=raise-missing-from
|
||||
|
||||
# Fetch content type for model, including proxy models.
|
||||
content_type = ContentType.objects.get_for_model(model_class, for_concrete_model=False)
|
||||
try:
|
||||
new_permission = Permission.objects.get(
|
||||
content_type=content_type,
|
||||
codename=codename,
|
||||
)
|
||||
except Permission.DoesNotExist:
|
||||
# give a more helpful error
|
||||
raise CommandError( # lint-amnesty, pylint: disable=raise-missing-from
|
||||
_(
|
||||
'Invalid permission codename: "{codename}". No such permission exists '
|
||||
'for the model {module}.{model_name}.'
|
||||
).format(
|
||||
codename=codename,
|
||||
module=model_class.__module__,
|
||||
model_name=model_class.__name__,
|
||||
)
|
||||
)
|
||||
new_permissions.add(new_permission)
|
||||
return new_permissions
|
||||
@@ -1,163 +0,0 @@
|
||||
"""
|
||||
Management command `manage_user` is used to idempotently create or remove
|
||||
Django users, set/unset permission bits, and associate groups by name.
|
||||
"""
|
||||
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.hashers import is_password_usable, identify_hasher
|
||||
from django.contrib.auth.models import Group
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from django.db import transaction
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from openedx.core.djangoapps.user_authn.utils import generate_password
|
||||
from common.djangoapps.student.models import UserProfile
|
||||
|
||||
|
||||
def is_valid_django_hash(encoded):
|
||||
"""
|
||||
Starting with django 2.1, the function is_password_usable no longer checks whether encode
|
||||
is a valid password created by a django hasher (hasher in PASSWORD_HASHERS setting)
|
||||
|
||||
Adding this function to create constant behavior as we upgrade django versions
|
||||
"""
|
||||
try:
|
||||
identify_hasher(encoded)
|
||||
except ValueError:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
class Command(BaseCommand): # lint-amnesty, pylint: disable=missing-class-docstring
|
||||
help = 'Creates the specified user, if it does not exist, and sets its groups.'
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('username')
|
||||
parser.add_argument('email')
|
||||
parser.add_argument('--remove', dest='is_remove', action='store_true')
|
||||
parser.add_argument('--superuser', dest='is_superuser', action='store_true')
|
||||
parser.add_argument('--staff', dest='is_staff', action='store_true')
|
||||
parser.add_argument('--unusable-password', dest='unusable_password', action='store_true')
|
||||
parser.add_argument('--initial-password-hash', dest='initial_password_hash')
|
||||
parser.add_argument('-g', '--groups', nargs='*', default=[])
|
||||
|
||||
def _maybe_update(self, user, attribute, new_value):
|
||||
"""
|
||||
DRY helper. If the specified attribute of the user differs from the
|
||||
specified value, it will be updated.
|
||||
"""
|
||||
old_value = getattr(user, attribute)
|
||||
if new_value != old_value:
|
||||
self.stderr.write(
|
||||
_('Setting {attribute} for user "{username}" to "{new_value}"').format(
|
||||
attribute=attribute, username=user.username, new_value=new_value
|
||||
)
|
||||
)
|
||||
setattr(user, attribute, new_value)
|
||||
|
||||
def _check_email_match(self, user, email):
|
||||
"""
|
||||
DRY helper.
|
||||
|
||||
Requiring the user to specify both username and email will help catch
|
||||
certain issues, for example if the expected username has already been
|
||||
taken by someone else.
|
||||
"""
|
||||
if user.email.lower() != email.lower():
|
||||
# The passed email address doesn't match this username's email address.
|
||||
# Assume a problem and fail.
|
||||
raise CommandError(
|
||||
_(
|
||||
'Skipping user "{}" because the specified and existing email '
|
||||
'addresses do not match.'
|
||||
).format(user.username)
|
||||
)
|
||||
|
||||
def _handle_remove(self, username, email): # lint-amnesty, pylint: disable=missing-function-docstring
|
||||
try:
|
||||
user = get_user_model().objects.get(username=username)
|
||||
except get_user_model().DoesNotExist:
|
||||
self.stderr.write(_('Did not find a user with username "{}" - skipping.').format(username))
|
||||
return
|
||||
self._check_email_match(user, email)
|
||||
self.stderr.write(_('Removing user: "{}"').format(user))
|
||||
user.delete()
|
||||
|
||||
@transaction.atomic
|
||||
def handle(self, username, email, is_remove, is_staff, is_superuser, groups, # lint-amnesty, pylint: disable=arguments-differ
|
||||
unusable_password, initial_password_hash, *args, **options):
|
||||
|
||||
if is_remove:
|
||||
return self._handle_remove(username, email)
|
||||
|
||||
old_groups, new_groups = set(), set()
|
||||
user, created = get_user_model().objects.get_or_create(
|
||||
username=username,
|
||||
defaults={'email': email}
|
||||
)
|
||||
|
||||
if created:
|
||||
if initial_password_hash:
|
||||
if not (is_password_usable(initial_password_hash) and is_valid_django_hash(initial_password_hash)):
|
||||
raise CommandError(f'The password hash provided for user {username} is invalid.')
|
||||
user.password = initial_password_hash
|
||||
else:
|
||||
# Set the password to a random, unknown, but usable password
|
||||
# allowing self-service password resetting. Cases where unusable
|
||||
# passwords are required, should be explicit, and will be handled below.
|
||||
user.set_password(generate_password(length=25))
|
||||
self.stderr.write(_('Created new user: "{}"').format(user))
|
||||
else:
|
||||
# NOTE, we will not update the email address of an existing user.
|
||||
self.stderr.write(_('Found existing user: "{}"').format(user))
|
||||
self._check_email_match(user, email)
|
||||
old_groups = set(user.groups.all())
|
||||
|
||||
self._maybe_update(user, 'is_staff', is_staff)
|
||||
self._maybe_update(user, 'is_superuser', is_superuser)
|
||||
|
||||
# Set unusable password if specified
|
||||
if unusable_password and user.has_usable_password():
|
||||
self.stderr.write(_('Setting unusable password for user "{}"').format(user))
|
||||
user.set_unusable_password()
|
||||
|
||||
# Ensure the user has a profile
|
||||
try:
|
||||
__ = user.profile
|
||||
except UserProfile.DoesNotExist:
|
||||
UserProfile.objects.create(user=user)
|
||||
self.stderr.write(_('Created new profile for user: "{}"').format(user))
|
||||
|
||||
# resolve the specified groups
|
||||
for group_name in groups or set():
|
||||
|
||||
try:
|
||||
group = Group.objects.get(name=group_name)
|
||||
new_groups.add(group)
|
||||
except Group.DoesNotExist:
|
||||
# warn, but move on.
|
||||
self.stderr.write(_('Could not find a group named "{}" - skipping.').format(group_name))
|
||||
|
||||
add_groups = new_groups - old_groups
|
||||
remove_groups = old_groups - new_groups
|
||||
|
||||
self.stderr.write(
|
||||
_(
|
||||
'Adding user "{username}" to groups {group_names}'
|
||||
).format(
|
||||
username=user.username,
|
||||
group_names=[g.name for g in add_groups]
|
||||
)
|
||||
)
|
||||
self.stderr.write(
|
||||
_(
|
||||
'Removing user "{username}" from groups {group_names}'
|
||||
).format(
|
||||
username=user.username,
|
||||
group_names=[g.name for g in remove_groups]
|
||||
)
|
||||
)
|
||||
|
||||
user.groups.set(new_groups)
|
||||
user.save()
|
||||
@@ -1,187 +0,0 @@
|
||||
"""
|
||||
Unit tests for user_management management commands.
|
||||
"""
|
||||
|
||||
|
||||
import sys
|
||||
import pytest
|
||||
import ddt
|
||||
|
||||
from django.contrib.auth.models import Group, Permission
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.management import CommandError, call_command
|
||||
from django.test import TestCase
|
||||
|
||||
TEST_EMAIL = 'test@example.com'
|
||||
TEST_GROUP = 'test-group'
|
||||
TEST_USERNAME = 'test-user'
|
||||
TEST_DATA = (
|
||||
{},
|
||||
{
|
||||
TEST_GROUP: ['add_group', 'change_group', 'change_group'],
|
||||
},
|
||||
{
|
||||
'other-group': ['add_group', 'change_group', 'change_group'],
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestManageGroupCommand(TestCase):
|
||||
"""
|
||||
Tests the `manage_group` command.
|
||||
"""
|
||||
|
||||
def set_group_permissions(self, group_permissions):
|
||||
"""
|
||||
Sets up a before-state for groups and permissions in tests, which
|
||||
can be checked afterward to ensure that a failed atomic
|
||||
operation has not had any side effects.
|
||||
"""
|
||||
content_type = ContentType.objects.get_for_model(Group)
|
||||
for group_name, permission_codenames in group_permissions.items():
|
||||
group = Group.objects.create(name=group_name)
|
||||
for codename in permission_codenames:
|
||||
group.permissions.add(
|
||||
Permission.objects.get(content_type=content_type, codename=codename)
|
||||
)
|
||||
|
||||
def check_group_permissions(self, group_permissions):
|
||||
"""
|
||||
Checks that the current state of the database matches the specified groups and
|
||||
permissions.
|
||||
"""
|
||||
self.check_groups(list(group_permissions.keys()))
|
||||
for group_name, permission_codenames in group_permissions.items():
|
||||
self.check_permissions(group_name, permission_codenames)
|
||||
|
||||
def check_groups(self, group_names):
|
||||
"""
|
||||
DRY helper.
|
||||
"""
|
||||
assert set(group_names) == {g.name for g in Group.objects.all()}
|
||||
|
||||
def check_permissions(self, group_name, permission_codenames):
|
||||
"""
|
||||
DRY helper.
|
||||
"""
|
||||
assert set(permission_codenames) == {p.codename for p in Group.objects.get(name=group_name).permissions.all()}
|
||||
|
||||
@ddt.data(
|
||||
*(
|
||||
(data, args, exception)
|
||||
for data in TEST_DATA
|
||||
for args, exception in (
|
||||
((), 'too few arguments' if sys.version_info.major == 2 else 'required: group_name'), # no group name
|
||||
(('x' * 151,), 'invalid group name'), # invalid group name
|
||||
((TEST_GROUP, 'some-other-group'), 'unrecognized arguments'), # multiple arguments
|
||||
((TEST_GROUP, '--some-option', 'dummy'), 'unrecognized arguments') # unexpected option name
|
||||
)
|
||||
)
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_invalid_input(self, initial_group_permissions, command_args, exception_message):
|
||||
"""
|
||||
Ensures that invalid inputs result in errors with relevant output,
|
||||
and that no persistent state is changed.
|
||||
"""
|
||||
self.set_group_permissions(initial_group_permissions)
|
||||
|
||||
with pytest.raises(CommandError) as exc_context:
|
||||
call_command('manage_group', *command_args)
|
||||
assert exception_message in str(exc_context.value).lower()
|
||||
self.check_group_permissions(initial_group_permissions)
|
||||
|
||||
@ddt.data(*TEST_DATA)
|
||||
def test_invalid_permission(self, initial_group_permissions):
|
||||
"""
|
||||
Ensures that a permission that cannot be parsed or resolved results in
|
||||
and error and that no persistent state is changed.
|
||||
"""
|
||||
self.set_group_permissions(initial_group_permissions)
|
||||
|
||||
# not parseable
|
||||
with pytest.raises(CommandError) as exc_context:
|
||||
call_command('manage_group', TEST_GROUP, '--permissions', 'fail')
|
||||
assert 'invalid permission option' in str(exc_context.value).lower()
|
||||
self.check_group_permissions(initial_group_permissions)
|
||||
|
||||
# not parseable
|
||||
with pytest.raises(CommandError) as exc_context:
|
||||
call_command('manage_group', TEST_GROUP, '--permissions', 'f:a:i:l')
|
||||
assert 'invalid permission option' in str(exc_context.value).lower()
|
||||
self.check_group_permissions(initial_group_permissions)
|
||||
|
||||
# invalid app label
|
||||
with pytest.raises(CommandError) as exc_context:
|
||||
call_command('manage_group', TEST_GROUP, '--permissions', 'nonexistent-label:dummy-model:dummy-perm')
|
||||
assert 'no installed app' in str(exc_context.value).lower()
|
||||
assert 'nonexistent-label' in str(exc_context.value).lower()
|
||||
self.check_group_permissions(initial_group_permissions)
|
||||
|
||||
# invalid model name
|
||||
with pytest.raises(CommandError) as exc_context:
|
||||
call_command('manage_group', TEST_GROUP, '--permissions', 'auth:nonexistent-model:dummy-perm')
|
||||
assert 'nonexistent-model' in str(exc_context.value).lower()
|
||||
self.check_group_permissions(initial_group_permissions)
|
||||
|
||||
# invalid model name
|
||||
with pytest.raises(CommandError) as exc_context:
|
||||
call_command('manage_group', TEST_GROUP, '--permissions', 'auth:Group:nonexistent-perm')
|
||||
assert 'invalid permission codename' in str(exc_context.value).lower()
|
||||
assert 'nonexistent-perm' in str(exc_context.value).lower()
|
||||
self.check_group_permissions(initial_group_permissions)
|
||||
|
||||
def test_group(self):
|
||||
"""
|
||||
Ensures that groups are created if they don't exist and reused if they do.
|
||||
"""
|
||||
self.check_groups([])
|
||||
call_command('manage_group', TEST_GROUP)
|
||||
self.check_groups([TEST_GROUP])
|
||||
|
||||
# check idempotency
|
||||
call_command('manage_group', TEST_GROUP)
|
||||
self.check_groups([TEST_GROUP])
|
||||
|
||||
def test_group_remove(self):
|
||||
"""
|
||||
Ensures that groups are removed if they exist and we exit cleanly otherwise.
|
||||
"""
|
||||
self.set_group_permissions({TEST_GROUP: ['add_group']})
|
||||
self.check_groups([TEST_GROUP])
|
||||
call_command('manage_group', TEST_GROUP, '--remove')
|
||||
self.check_groups([])
|
||||
|
||||
# check idempotency
|
||||
call_command('manage_group', TEST_GROUP, '--remove')
|
||||
self.check_groups([])
|
||||
|
||||
def test_permissions(self):
|
||||
"""
|
||||
Ensures that permissions are set on the group as specified.
|
||||
"""
|
||||
self.check_groups([])
|
||||
call_command('manage_group', TEST_GROUP, '--permissions', 'auth:Group:add_group')
|
||||
self.check_groups([TEST_GROUP])
|
||||
self.check_permissions(TEST_GROUP, ['add_group'])
|
||||
|
||||
# check idempotency
|
||||
call_command('manage_group', TEST_GROUP, '--permissions', 'auth:Group:add_group')
|
||||
self.check_groups([TEST_GROUP])
|
||||
self.check_permissions(TEST_GROUP, ['add_group'])
|
||||
|
||||
# check adding a permission
|
||||
call_command('manage_group', TEST_GROUP, '--permissions', 'auth:Group:add_group', 'auth:Group:change_group')
|
||||
self.check_groups([TEST_GROUP])
|
||||
self.check_permissions(TEST_GROUP, ['add_group', 'change_group'])
|
||||
|
||||
# check removing a permission
|
||||
call_command('manage_group', TEST_GROUP, '--permissions', 'auth:Group:change_group')
|
||||
self.check_groups([TEST_GROUP])
|
||||
self.check_permissions(TEST_GROUP, ['change_group'])
|
||||
|
||||
# check removing all permissions
|
||||
call_command('manage_group', TEST_GROUP)
|
||||
self.check_groups([TEST_GROUP])
|
||||
self.check_permissions(TEST_GROUP, [])
|
||||
@@ -1,186 +0,0 @@
|
||||
"""
|
||||
Unit tests for user_management management commands.
|
||||
"""
|
||||
|
||||
|
||||
import itertools
|
||||
|
||||
import pytest
|
||||
import ddt
|
||||
|
||||
from django.contrib.auth.hashers import make_password
|
||||
from django.contrib.auth.models import Group, User # lint-amnesty, pylint: disable=imported-auth-user
|
||||
from django.core.management import CommandError, call_command
|
||||
from django.test import TestCase
|
||||
|
||||
from openedx.core.djangoapps.user_authn.utils import generate_password
|
||||
|
||||
TEST_EMAIL = 'test@example.com'
|
||||
TEST_USERNAME = 'test-user'
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestManageUserCommand(TestCase):
|
||||
"""
|
||||
Tests the `manage_user` command.
|
||||
"""
|
||||
|
||||
def test_user(self):
|
||||
"""
|
||||
Ensures that users are created if they don't exist and reused if they do.
|
||||
"""
|
||||
assert [] == list(User.objects.all())
|
||||
call_command('manage_user', TEST_USERNAME, TEST_EMAIL)
|
||||
user = User.objects.get(username=TEST_USERNAME)
|
||||
assert user.username == TEST_USERNAME
|
||||
assert user.email == TEST_EMAIL
|
||||
assert user.profile is not None
|
||||
|
||||
# check idempotency
|
||||
call_command('manage_user', TEST_USERNAME, TEST_EMAIL)
|
||||
assert [(TEST_USERNAME, TEST_EMAIL)] == [(u.username, u.email) for u in User.objects.all()]
|
||||
|
||||
def test_remove(self):
|
||||
"""
|
||||
Ensures that users are removed if they exist and exit cleanly otherwise.
|
||||
"""
|
||||
User.objects.create(username=TEST_USERNAME, email=TEST_EMAIL)
|
||||
assert [(TEST_USERNAME, TEST_EMAIL)] == [(u.username, u.email) for u in User.objects.all()]
|
||||
call_command('manage_user', TEST_USERNAME, TEST_EMAIL, '--remove')
|
||||
assert [] == list(User.objects.all())
|
||||
|
||||
# check idempotency
|
||||
call_command('manage_user', TEST_USERNAME, TEST_EMAIL, '--remove')
|
||||
assert [] == list(User.objects.all())
|
||||
|
||||
def test_unusable_password(self):
|
||||
"""
|
||||
Ensure that a user's password is set to an unusable_password.
|
||||
"""
|
||||
user = User.objects.create(username=TEST_USERNAME, email=TEST_EMAIL)
|
||||
assert [(TEST_USERNAME, TEST_EMAIL)] == [(u.username, u.email) for u in User.objects.all()]
|
||||
user.set_password(generate_password())
|
||||
user.save()
|
||||
|
||||
# Run once without passing --unusable-password and make sure the password is usable
|
||||
call_command('manage_user', TEST_USERNAME, TEST_EMAIL)
|
||||
user = User.objects.get(username=TEST_USERNAME, email=TEST_EMAIL)
|
||||
assert user.has_usable_password()
|
||||
|
||||
# Make sure the user now has an unusable_password
|
||||
call_command('manage_user', TEST_USERNAME, TEST_EMAIL, '--unusable-password')
|
||||
user = User.objects.get(username=TEST_USERNAME, email=TEST_EMAIL)
|
||||
assert not user.has_usable_password()
|
||||
|
||||
# check idempotency
|
||||
call_command('manage_user', TEST_USERNAME, TEST_EMAIL, '--unusable-password')
|
||||
assert not user.has_usable_password()
|
||||
|
||||
def test_initial_password_hash(self):
|
||||
"""
|
||||
Ensure that a user's password hash is set correctly when the user is created,
|
||||
and that it isn't touched for existing users.
|
||||
"""
|
||||
initial_hash = make_password('hunter2')
|
||||
|
||||
# Make sure the command aborts if the provided hash isn't a valid Django password hash
|
||||
with pytest.raises(CommandError) as exc_context:
|
||||
call_command('manage_user', TEST_USERNAME, TEST_EMAIL, '--initial-password-hash', 'invalid_hash')
|
||||
assert 'password hash' in str(exc_context.value).lower()
|
||||
|
||||
# Make sure the hash gets set correctly for a new user
|
||||
call_command('manage_user', TEST_USERNAME, TEST_EMAIL, '--initial-password-hash', initial_hash)
|
||||
user = User.objects.get(username=TEST_USERNAME)
|
||||
assert user.password == initial_hash
|
||||
|
||||
# Change the password
|
||||
new_hash = make_password('correct horse battery staple')
|
||||
user.password = new_hash
|
||||
user.save()
|
||||
|
||||
# Verify that calling manage_user again leaves the password untouched
|
||||
call_command('manage_user', TEST_USERNAME, TEST_EMAIL, '--initial-password-hash', initial_hash)
|
||||
user = User.objects.get(username=TEST_USERNAME)
|
||||
assert user.password == new_hash
|
||||
|
||||
def test_wrong_email(self):
|
||||
"""
|
||||
Ensure that the operation is aborted if the username matches an
|
||||
existing user account but the supplied email doesn't match.
|
||||
"""
|
||||
User.objects.create(username=TEST_USERNAME, email=TEST_EMAIL)
|
||||
with pytest.raises(CommandError) as exc_context:
|
||||
call_command('manage_user', TEST_USERNAME, 'other@example.com')
|
||||
assert 'email addresses do not match' in str(exc_context.value).lower()
|
||||
assert [(TEST_USERNAME, TEST_EMAIL)] == [(u.username, u.email) for u in User.objects.all()]
|
||||
|
||||
# check that removal uses the same check
|
||||
with pytest.raises(CommandError) as exc_context:
|
||||
call_command('manage_user', TEST_USERNAME, 'other@example.com', '--remove')
|
||||
assert 'email addresses do not match' in str(exc_context.value).lower()
|
||||
assert [(TEST_USERNAME, TEST_EMAIL)] == [(u.username, u.email) for u in User.objects.all()]
|
||||
|
||||
def test_same_email_varied_case(self):
|
||||
"""
|
||||
Ensure that the operation continues if the username matches an
|
||||
existing user account and the supplied email differs only in cases.
|
||||
"""
|
||||
User.objects.create(username=TEST_USERNAME, email=TEST_EMAIL.upper())
|
||||
call_command('manage_user', TEST_USERNAME, TEST_EMAIL.lower())
|
||||
user = User.objects.get(username=TEST_USERNAME)
|
||||
assert user.email == TEST_EMAIL.upper()
|
||||
|
||||
@ddt.data(*itertools.product([(True, True), (True, False), (False, True), (False, False)], repeat=2))
|
||||
@ddt.unpack
|
||||
def test_bits(self, initial_bits, expected_bits):
|
||||
"""
|
||||
Ensure that the 'staff' and 'superuser' bits are set according to the
|
||||
presence / absence of the associated command options, regardless of
|
||||
any previous state.
|
||||
"""
|
||||
initial_staff, initial_super = initial_bits
|
||||
User.objects.create(
|
||||
username=TEST_USERNAME,
|
||||
email=TEST_EMAIL,
|
||||
is_staff=initial_staff,
|
||||
is_superuser=initial_super
|
||||
)
|
||||
|
||||
expected_staff, expected_super = expected_bits
|
||||
args = [opt for bit, opt in ((expected_staff, '--staff'), (expected_super, '--superuser')) if bit]
|
||||
call_command('manage_user', TEST_USERNAME, TEST_EMAIL, *args)
|
||||
user = User.objects.all().first()
|
||||
assert user.is_staff == expected_staff
|
||||
assert user.is_superuser == expected_super
|
||||
|
||||
@ddt.data(*itertools.product(('', 'a', 'ab', 'abc'), repeat=2))
|
||||
@ddt.unpack
|
||||
def test_groups(self, initial_groups, expected_groups):
|
||||
"""
|
||||
Ensures groups assignments are created and deleted idempotently.
|
||||
"""
|
||||
groups = {}
|
||||
for group_name in 'abc':
|
||||
groups[group_name] = Group.objects.create(name=group_name)
|
||||
|
||||
user = User.objects.create(username=TEST_USERNAME, email=TEST_EMAIL)
|
||||
for group_name in initial_groups:
|
||||
user.groups.add(groups[group_name])
|
||||
|
||||
call_command('manage_user', TEST_USERNAME, TEST_EMAIL, '-g', *expected_groups)
|
||||
actual_groups = [group.name for group in user.groups.all()]
|
||||
assert actual_groups == list(expected_groups)
|
||||
|
||||
def test_nonexistent_group(self):
|
||||
"""
|
||||
Ensures the command does not fail if specified groups cannot be found.
|
||||
"""
|
||||
user = User.objects.create(username=TEST_USERNAME, email=TEST_EMAIL)
|
||||
groups = {}
|
||||
for group_name in 'abc':
|
||||
groups[group_name] = Group.objects.create(name=group_name)
|
||||
user.groups.add(groups[group_name])
|
||||
|
||||
call_command('manage_user', TEST_USERNAME, TEST_EMAIL, '-g', 'b', 'c', 'd')
|
||||
actual_groups = [group.name for group in user.groups.all()]
|
||||
assert actual_groups == ['b', 'c']
|
||||
@@ -828,6 +828,15 @@ def user_post_save_callback(sender, **kwargs):
|
||||
enrollment
|
||||
)
|
||||
|
||||
# Ensure the user has a profile when run via management command
|
||||
_called_by_management_command = getattr(user, '_called_by_management_command', None)
|
||||
if _called_by_management_command and kwargs['created']:
|
||||
try:
|
||||
__ = user.profile
|
||||
except UserProfile.DoesNotExist:
|
||||
UserProfile.objects.create(user=user)
|
||||
log.info('Created new profile for user: %s', user)
|
||||
|
||||
# Because `emit_field_changed_events` removes the record of the fields that
|
||||
# were changed, wait to do that until after we've checked them as part of
|
||||
# the condition on whether we want to check for automatic enrollments.
|
||||
|
||||
@@ -16,9 +16,10 @@ from common.djangoapps.student.models import UserPasswordToggleHistory
|
||||
from common.djangoapps.util.json_request import JsonResponse
|
||||
from lms.djangoapps.support.decorators import require_support_permission
|
||||
from openedx.core.djangoapps.user_api.accounts.serializers import AccountUserSerializer
|
||||
from openedx.core.djangoapps.user_authn.utils import generate_password
|
||||
from openedx.core.djangolib.oauth2_retirement_utils import retire_dot_oauth2_models
|
||||
|
||||
from edx_django_utils.user import generate_password
|
||||
|
||||
|
||||
class ManageUserSupportView(View):
|
||||
"""
|
||||
|
||||
@@ -3143,7 +3143,10 @@ INSTALLED_APPS = [
|
||||
'lms.djangoapps.bulk_user_retirement',
|
||||
|
||||
# Agreements
|
||||
'openedx.core.djangoapps.agreements'
|
||||
'openedx.core.djangoapps.agreements',
|
||||
|
||||
# User and group management via edx-django-utils
|
||||
'edx_django_utils.user'
|
||||
]
|
||||
|
||||
######################### CSRF #########################################
|
||||
|
||||
@@ -6,7 +6,7 @@ Django forms for accounts
|
||||
from django import forms
|
||||
from django.core.exceptions import ValidationError
|
||||
|
||||
from openedx.core.djangoapps.user_authn.utils import generate_password
|
||||
from edx_django_utils.user import generate_password
|
||||
|
||||
|
||||
class RetirementQueueDeletionForm(forms.Form):
|
||||
|
||||
@@ -10,7 +10,8 @@ import logging
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
|
||||
from openedx.core.djangoapps.user_api.models import UserRetirementStatus
|
||||
from openedx.core.djangoapps.user_authn.utils import generate_password
|
||||
|
||||
from edx_django_utils.user import generate_password
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -3,16 +3,13 @@
|
||||
|
||||
from collections import namedtuple
|
||||
from urllib.parse import urlencode # pylint: disable=import-error
|
||||
import pytest
|
||||
import ddt
|
||||
from django.test import TestCase
|
||||
from django.test.client import RequestFactory
|
||||
from django.test.utils import override_settings
|
||||
|
||||
from openedx.core.djangoapps.oauth_dispatch.tests.factories import ApplicationFactory
|
||||
from openedx.core.djangoapps.user_authn.utils import (
|
||||
generate_password, is_safe_login_or_logout_redirect
|
||||
)
|
||||
from openedx.core.djangoapps.user_authn.utils import is_safe_login_or_logout_redirect
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@@ -71,29 +68,3 @@ class TestRedirectUtils(TestCase):
|
||||
req = self.request.get(f'/logout?{urlencode(params)}', HTTP_HOST=host)
|
||||
actual_is_safe = self._is_safe_redirect(req, redirect_url)
|
||||
assert actual_is_safe == expected_is_safe
|
||||
|
||||
|
||||
class GeneratePasswordTest(TestCase):
|
||||
"""Tests formation of randomly generated passwords."""
|
||||
|
||||
def test_default_args(self):
|
||||
password = generate_password()
|
||||
assert 12 == len(password)
|
||||
assert any(c.isdigit for c in password)
|
||||
assert any(c.isalpha for c in password)
|
||||
|
||||
def test_length(self):
|
||||
length = 25
|
||||
assert length == len(generate_password(length=length))
|
||||
|
||||
def test_chars(self):
|
||||
char = '!'
|
||||
password = generate_password(length=12, chars=(char,))
|
||||
|
||||
assert any(c.isdigit for c in password)
|
||||
assert any(c.isalpha for c in password)
|
||||
assert (char * 10) == password[2:]
|
||||
|
||||
def test_min_length(self):
|
||||
with pytest.raises(ValueError):
|
||||
generate_password(length=7)
|
||||
|
||||
@@ -3,7 +3,6 @@ Utility functions used during user authentication.
|
||||
"""
|
||||
|
||||
import random
|
||||
import string
|
||||
from urllib.parse import urlparse # pylint: disable=import-error
|
||||
from uuid import uuid4 # lint-amnesty, pylint: disable=unused-import
|
||||
|
||||
@@ -49,20 +48,6 @@ def is_safe_login_or_logout_redirect(redirect_to, request_host, dot_client_id, r
|
||||
return is_safe_url
|
||||
|
||||
|
||||
def generate_password(length=12, chars=string.ascii_letters + string.digits):
|
||||
"""Generate a valid random password"""
|
||||
if length < 8:
|
||||
raise ValueError("password must be at least 8 characters")
|
||||
|
||||
choice = random.SystemRandom().choice
|
||||
|
||||
password = ''
|
||||
password += choice(string.digits)
|
||||
password += choice(string.ascii_letters)
|
||||
password += ''.join([choice(chars) for _i in range(length - 2)])
|
||||
return password
|
||||
|
||||
|
||||
def is_registration_api_v1(request):
|
||||
"""
|
||||
Checks if registration api is v1
|
||||
|
||||
@@ -18,7 +18,6 @@ from opaque_keys.edx.locator import CourseLocator
|
||||
|
||||
from lms.djangoapps.verify_student.models import ManualVerification
|
||||
from openedx.core.djangoapps.django_comment_common.models import assign_role
|
||||
from openedx.core.djangoapps.user_authn.utils import generate_password
|
||||
from openedx.core.djangoapps.user_authn.views.registration_form import AccountCreationForm
|
||||
from openedx.features.course_experience import course_home_url_name
|
||||
from common.djangoapps.student.helpers import (
|
||||
@@ -37,6 +36,8 @@ from common.djangoapps.student.models import (
|
||||
)
|
||||
from common.djangoapps.util.json_request import JsonResponse
|
||||
|
||||
from edx_django_utils.user import generate_password
|
||||
|
||||
|
||||
def auto_auth(request): # pylint: disable=too-many-statements
|
||||
"""
|
||||
|
||||
@@ -52,7 +52,7 @@ from openedx.core.djangoapps.user_api.accounts.api import (
|
||||
from openedx.core.djangoapps.user_api.preferences import api as preferences_api
|
||||
from openedx.core.djangoapps.user_authn.cookies import set_logged_in_cookies
|
||||
from openedx.core.djangoapps.user_authn.utils import (
|
||||
generate_password, generate_username_suggestions, is_registration_api_v1
|
||||
generate_username_suggestions, is_registration_api_v1
|
||||
)
|
||||
from openedx.core.djangoapps.user_authn.views.registration_form import (
|
||||
AccountCreationForm,
|
||||
@@ -80,6 +80,8 @@ from common.djangoapps.track import segment
|
||||
from common.djangoapps.util.db import outer_atomic
|
||||
from common.djangoapps.util.json_request import JsonResponse
|
||||
|
||||
from edx_django_utils.user import generate_password
|
||||
|
||||
log = logging.getLogger("edx.student")
|
||||
AUDIT_LOG = logging.getLogger("audit")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user