chore: add multiple users to retirement queue.

This commit is contained in:
Hamza442
2023-01-05 15:54:21 +05:00
parent 3877201dd2
commit 265ffda0aa
2 changed files with 117 additions and 36 deletions

View File

@@ -12,7 +12,6 @@ from openedx.core.djangolib.oauth2_retirement_utils import retire_dot_oauth2_mod
from ...models import UserRetirementStatus
logger = logging.getLogger(__name__)
@@ -35,55 +34,97 @@ class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument(
'--username',
required=True,
type=str,
help='Username to be retired'
)
parser.add_argument(
'--user_email',
required=True,
type=str,
help='User email address.'
)
parser.add_argument(
'--user_file',
type=str,
help='Comma separated file that have username and user_email of the users that needs to be retired'
)
def check_user_exist(self, user_model, userfile, user_name, useremail):
"""
Function to check if user exists. This function will execute for userfile
or user_name and useremail.
Args:
userfile: file that have username and email of users to be retired
user_name: username of user to be retired
useremail: email of user to be retired
"""
unknown_users = []
users = []
if userfile:
try:
userinfo = open(userfile, 'r')
except Exception as exc:
error_message = f'Error while reading file: {exc}'
logger.error(error_message)
raise CommandError(error_message) # lint-amnesty, pylint: disable=raise-missing-from
for record in userinfo:
userdata = record.split(',')
username = userdata[0].strip()
user_email = userdata[1].strip()
try:
users.append(User.objects.get(username=username, email=user_email))
except user_model.DoesNotExist:
unknown_users.append({username: user_email})
elif user_name and useremail:
try:
users.append(User.objects.get(username=username, email=user_email))
except user_model.DoesNotExist:
unknown_users.append({username: useremail})
else:
raise CommandError("Please provide user_file or username and user_email parameter when runing command")
return users, unknown_users
def handle(self, *args, **options):
"""
Execute the command.
"""
userfile = options['user_file']
user_name = options['username']
useremail = options['user_email']
username = options['username']
user_email = options['user_email']
try:
user = User.objects.get(username=username, email=user_email)
except:
user_model = get_user_model()
users, unknown_users = self.check_user_exist(user_model, userfile, user_name, useremail)
# Raise if found any such user that does not exit
if len(unknown_users) > 0:
error_message = (
'Could not find a user with specified username and email '
'Could not find users with specified username and email '
'address. Make sure you have everything correct before '
'trying again'
)
logger.error(error_message)
raise CommandError(error_message) # lint-amnesty, pylint: disable=raise-missing-from
user_model = get_user_model()
raise CommandError(error_message + f': {unknown_users}') # lint-amnesty, pylint: disable=raise-missing-from
try:
with transaction.atomic():
# Add user to retirement queue.
UserRetirementStatus.create_retirement(user)
# Unlink LMS social auth accounts
UserSocialAuth.objects.filter(user_id=user.id).delete()
# Change LMS password & email
user.email = get_retired_email_by_email(user.email)
user.set_unusable_password()
user.save()
for user in users:
# Add user to retirement queue.
UserRetirementStatus.create_retirement(user)
# Unlink LMS social auth accounts
UserSocialAuth.objects.filter(user_id=user.id).delete()
# Change LMS password & email
user.email = get_retired_email_by_email(user.email)
user.set_unusable_password()
user.save()
# TODO: Unlink social accounts & change password on each IDA.
# Remove the activation keys sent by email to the user for account activation.
Registration.objects.filter(user=user).delete()
# TODO: Unlink social accounts & change password on each IDA.
# Remove the activation keys sent by email to the user for account activation.
Registration.objects.filter(user=user).delete()
# Delete OAuth tokens associated with the user.
retire_dot_oauth2_models(user)
AccountRecovery.retire_recovery_email(user.id)
# Delete OAuth tokens associated with the user.
retire_dot_oauth2_models(user)
AccountRecovery.retire_recovery_email(user.id)
except KeyError:
error_message = f'Username not specified {user}'
logger.error(error_message)
@@ -93,8 +134,7 @@ class Command(BaseCommand):
logger.error(error_message)
raise CommandError(error_message) # lint-amnesty, pylint: disable=raise-missing-from
except Exception as exc: # pylint: disable=broad-except
error_message = f'500 error deactivating account {exc}'
error_message = f'500 error deactivating account: {exc}'
logger.error(error_message)
raise CommandError(error_message) # lint-amnesty, pylint: disable=raise-missing-from
logger.info("User succesfully moved to the retirment pipeline")

View File

@@ -13,29 +13,70 @@ from openedx.core.djangoapps.user_api.accounts.tests.retirement_helpers import (
)
from openedx.core.djangolib.testing.utils import skip_unless_lms # lint-amnesty, pylint: disable=wrong-import-order
from common.djangoapps.student.tests.factories import UserFactory # lint-amnesty, pylint: disable=wrong-import-order
import csv
import os
pytestmark = pytest.mark.django_db
user_file = 'userfile.csv'
def generate_dummy_users():
"""
Function to generate dummy users that needs to be retired
"""
users = []
emails = []
for i in range(1000):
user = UserFactory.create(username=f"user{i}", email=f"user{i}@example.com")
users.append(user.username)
emails.append(user.email)
users_list = [{'username': user, 'email': email} for user, email in zip(users, emails)]
return users_list
def create_user_file(other_email=False):
"""
Function to create a comma spearated file with username and password
Args:
other_email (bool, optional): test user with email mimatch. Defaults to False.
"""
users_to_retire = generate_dummy_users()
if other_email:
users_to_retire[0]['email'] = "other@edx.org"
with open(user_file, 'w', newline='') as file:
write = csv.writer(file)
for user in users_to_retire:
write.writerow(user.values())
def remove_user_file():
"""
Function to remove user file
"""
if os.path.exists(user_file):
os.remove(user_file)
@skip_unless_lms
def test_successful_retire(setup_retirement_states): # lint-amnesty, pylint: disable=redefined-outer-name, unused-argument
user = UserFactory()
user = UserFactory.create(username='user0', email="user0@example.com")
username = user.username
user_email = user.email
call_command('retire_user', username=username, user_email=user_email)
create_user_file()
call_command('retire_user', user_file=user_file)
user = User.objects.get(username=username)
retired_user_status = UserRetirementStatus.objects.all()[0]
assert retired_user_status.original_username == username
assert retired_user_status.original_email == user_email
# Make sure that we have changed the email address linked to the original user
assert user.email != user_email
remove_user_file()
@skip_unless_lms
def test_retire_user_with_usename_email_mismatch(setup_retirement_states): # lint-amnesty, pylint: disable=redefined-outer-name, unused-argument
user = UserFactory()
username = user.username
user_email = "other@edx.org"
with pytest.raises(CommandError, match=r'Could not find a user with specified username and email address'):
call_command('retire_user', username=username, user_email=user_email)
create_user_file(True)
with pytest.raises(CommandError, match=r'Could not find users with specified username and email '):
call_command('retire_user', user_file=user_file)
remove_user_file()