Merge pull request #17790 from edx/bmedx/user_retirement_utils

Add some common functionality for retiring users
This commit is contained in:
Brian Mesick
2018-04-02 14:41:52 -04:00
committed by GitHub
10 changed files with 334 additions and 9 deletions

View File

@@ -574,6 +574,14 @@ ENTERPRISE_REPORTING_SECRET = AUTH_TOKENS.get(
ENTERPRISE_REPORTING_SECRET
)
############### Settings for Retirement #####################
RETIRED_USERNAME_FMT = ENV_TOKENS.get('RETIRED_USERNAME_FMT', RETIRED_USERNAME_FMT)
RETIRED_EMAIL_FMT = ENV_TOKENS.get('RETIRED_EMAIL_FMT', RETIRED_EMAIL_FMT)
RETIREMENT_SERVICE_WORKER_USERNAME = ENV_TOKENS.get(
'RETIREMENT_SERVICE_WORKER_USERNAME',
RETIREMENT_SERVICE_WORKER_USERNAME
)
####################### Plugin Settings ##########################
from openedx.core.djangoapps.plugins import plugin_settings, constants as plugin_constants

View File

@@ -127,6 +127,11 @@ from lms.envs.common import (
VIDEO_IMAGE_SETTINGS,
VIDEO_TRANSCRIPTS_SETTINGS,
RETIRED_USERNAME_FMT,
RETIRED_EMAIL_FMT,
RETIRED_USER_SALTS,
RETIREMENT_SERVICE_WORKER_USERNAME,
# Methods to derive settings
_make_mako_template_dirs,
_make_locale_paths,

View File

@@ -46,6 +46,7 @@ from opaque_keys.edx.keys import CourseKey
from pytz import UTC
from six import text_type
from slumber.exceptions import HttpClientError, HttpServerError
from user_util import user_util
import dogstats_wrapper as dog_stats_api
import lms.lib.comment_client as cc
@@ -154,9 +155,9 @@ def anonymous_id_for_user(user, course_id, save=True):
# include the secret key as a salt, and to make the ids unique across different LMS installs.
hasher = hashlib.md5()
hasher.update(settings.SECRET_KEY)
hasher.update(unicode(user.id))
hasher.update(text_type(user.id))
if course_id:
hasher.update(unicode(course_id).encode('utf-8'))
hasher.update(text_type(course_id).encode('utf-8'))
digest = hasher.hexdigest()
if not hasattr(user, '_anonymous_id'):
@@ -199,6 +200,74 @@ def user_by_anonymous_id(uid):
return None
def is_username_retired(username):
"""
Checks to see if the given username has been previously retired
"""
locally_hashed_usernames = user_util.get_all_retired_usernames(
username,
settings.RETIRED_USER_SALTS,
settings.RETIRED_USERNAME_FMT
)
return User.objects.filter(username__in=list(locally_hashed_usernames)).exists()
def get_retired_username_by_username(username):
"""
Returns a "retired username" hashed using the newest configured salt
"""
return user_util.get_retired_username(username, settings.RETIRED_USER_SALTS, settings.RETIRED_USERNAME_FMT)
def get_retired_email_by_email(email):
"""
Returns a "retired email" hashed using the newest configured salt
"""
return user_util.get_retired_email(email, settings.RETIRED_USER_SALTS, settings.RETIRED_EMAIL_FMT)
def get_all_retired_usernames_by_username(username):
"""
Returns a generator of "retired usernames", one hashed with each
configured salt. Used for finding out if the given username has
ever been used and retired.
"""
return user_util.get_all_retired_usernames(username, settings.RETIRED_USER_SALTS, settings.RETIRED_USERNAME_FMT)
def get_all_retired_emails_by_email(email):
"""
Returns a generator of "retired emails", one hashed with each
configured salt. Used for finding out if the given email has
ever been used and retired.
"""
return user_util.get_all_retired_emails(email, settings.RETIRED_USER_SALTS, settings.RETIRED_EMAIL_FMT)
def get_potentially_retired_user_by_username_and_hash(username, hashed_username):
"""
To assist in the retirement process this method will:
- Confirm that any locally hashed username matches the passed in one
(in case of salt mismatches with the upstream script).
- Attempt to return a User object based on the username, or if it
does not exist, the any hashed username salted with the historical
salts.
"""
locally_hashed_usernames = list(get_all_retired_usernames_by_username(username))
if hashed_username not in locally_hashed_usernames:
raise Exception('Mismatched hashed_username, bad salt?')
try:
return User.objects.get(username=username)
except User.DoesNotExist:
# The 2nd DoesNotExist will bubble up from here if necessary,
# an assumption is being made here that our hashed username format
# is something that a user cannot create for themselves.
return User.objects.get(username__in=locally_hashed_usernames)
class UserStanding(models.Model):
"""
This table contains a student's account's status.
@@ -1329,7 +1398,7 @@ class CourseEnrollment(models.Model):
Also emits relevant events for analytics purposes.
"""
if mode is None:
mode = _default_course_mode(unicode(course_key))
mode = _default_course_mode(text_type(course_key))
# All the server-side checks for whether a user is allowed to enroll.
try:
course = CourseOverview.get_from_id(course_key)
@@ -1337,7 +1406,7 @@ class CourseEnrollment(models.Model):
# This is here to preserve legacy behavior which allowed enrollment in courses
# announced before the start of content creation.
if check_access:
log.warning(u"User %s failed to enroll in non-existent course %s", user.username, unicode(course_key))
log.warning(u"User %s failed to enroll in non-existent course %s", user.username, text_type(course_key))
raise NonExistentCourseError
if check_access:
@@ -1490,7 +1559,7 @@ class CourseEnrollment(models.Model):
assert isinstance(course_id_partial, CourseKey)
assert not course_id_partial.run # None or empty string
course_key = CourseKey.from_string('/'.join([course_id_partial.org, course_id_partial.course, '']))
querystring = unicode(course_key)
querystring = text_type(course_key)
try:
return cls.objects.filter(
user=user,
@@ -1881,7 +1950,7 @@ class CourseEnrollment(models.Model):
Returns:
Unicode cache key
"""
return cls.COURSE_ENROLLMENT_CACHE_KEY.format(user_id, unicode(course_key))
return cls.COURSE_ENROLLMENT_CACHE_KEY.format(user_id, text_type(course_key))
@classmethod
def _get_enrollment_state(cls, user, course_key):
@@ -1958,7 +2027,7 @@ def invalidate_enrollment_mode_cache(sender, instance, **kwargs): # pylint: dis
cache_key = CourseEnrollment.cache_key_name(
instance.user.id,
unicode(instance.course_id)
text_type(instance.course_id)
)
cache.delete(cache_key)
@@ -2412,7 +2481,7 @@ class LinkedInAddToProfileConfiguration(ConfigurationModel):
return (
u"{partner}-{course_key}_{cert_mode}-{target}".format(
partner=self.trk_partner_name,
course_key=unicode(course_key),
course_key=text_type(course_key),
cert_mode=cert_mode,
target=target
)

View File

@@ -0,0 +1,184 @@
"""
Test user retirement methods
TODO: When the hasher is working actually test it here with multiple salts
"""
from django.conf import settings
from django.contrib.auth.models import User
import pytest
from student.models import (
get_all_retired_emails_by_email,
get_all_retired_usernames_by_username,
get_potentially_retired_user_by_username_and_hash,
get_retired_email_by_email,
get_retired_username_by_username,
is_username_retired
)
from student.tests.factories import UserFactory
# Tell pytest it's ok to user the Django db
pytestmark = pytest.mark.django_db
# Make sure our settings are sane
assert "{}" in settings.RETIRED_USERNAME_FMT
assert "{}" in settings.RETIRED_EMAIL_FMT
RETIRED_USERNAME_START, RETIRED_USERNAME_END = settings.RETIRED_USERNAME_FMT.split("{}")
RETIRED_EMAIL_START, RETIRED_EMAIL_END = settings.RETIRED_EMAIL_FMT.split("{}")
def check_username_against_fmt(hashed_username):
"""
Checks that the given username is formatted correctly using
our settings. The format string may put the hashed string
anywhere, and the hasher is opaque, so this just looks for
our surrounding strings, if they exist.
"""
assert len(hashed_username) > len(settings.RETIRED_USERNAME_FMT)
if RETIRED_USERNAME_START:
assert hashed_username.startswith(RETIRED_USERNAME_START)
if RETIRED_USERNAME_END:
assert hashed_username.endswith(RETIRED_USERNAME_END)
def check_email_against_fmt(hashed_email):
"""
Checks that the given email is formatted correctly using
our settings. The format string may put the hashed string
anywhere, and the hasher is opaque, so this just looks for
our surrounding strings, if they exist.
"""
assert len(hashed_email) > len(settings.RETIRED_EMAIL_FMT)
if RETIRED_EMAIL_START:
assert hashed_email.startswith(RETIRED_EMAIL_START)
if RETIRED_EMAIL_END:
assert hashed_email.endswith(RETIRED_EMAIL_END)
def test_get_retired_username():
"""
Basic testing of getting retired usernames. The hasher is opaque
to us, we just care that it's succeeding and using our format.
"""
user = UserFactory()
hashed_username = get_retired_username_by_username(user.username)
check_username_against_fmt(hashed_username)
def test_get_all_retired_usernames_by_username():
"""
Check that all salts are used for this method and return expected
formats.
"""
user = UserFactory()
hashed_usernames = list(get_all_retired_usernames_by_username(user.username))
assert len(hashed_usernames) == len(settings.RETIRED_USER_SALTS)
for hashed_username in hashed_usernames:
check_username_against_fmt(hashed_username)
# Make sure hashes are unique
assert len(hashed_usernames) == len(set(hashed_usernames))
def test_is_username_retired_is_retired():
"""
Check functionality of is_username_retired when username is retired
"""
user = UserFactory()
original_username = user.username
retired_username = get_retired_username_by_username(user.username)
# Fake username retirement.
user.username = retired_username
user.save()
assert is_username_retired(original_username)
def test_is_username_retired_not_retired():
"""
Check functionality of is_username_retired when username is not retired
"""
user = UserFactory()
assert not is_username_retired(user.username)
def test_get_retired_email():
"""
Basic testing of retired emails.
"""
user = UserFactory()
hashed_email = get_retired_email_by_email(user.email)
check_email_against_fmt(hashed_email)
def test_get_all_retired_email_by_email():
"""
Check that all salts are used for this method and return expected
formats.
"""
user = UserFactory()
hashed_emails = list(get_all_retired_emails_by_email(user.email))
assert len(hashed_emails) == len(settings.RETIRED_USER_SALTS)
for hashed_email in hashed_emails:
check_email_against_fmt(hashed_email)
# Make sure hashes are unique
assert len(hashed_emails) == len(set(hashed_emails))
def test_get_potentially_retired_user_username_match():
"""
Check that we can pass in an un-retired username and get the
user-to-be-retired back.
"""
user = UserFactory()
hashed_username = get_retired_username_by_username(user.username)
assert get_potentially_retired_user_by_username_and_hash(user.username, hashed_username) == user
def test_get_potentially_retired_user_hashed_match():
"""
Check that we can pass in a hashed username and get the
user-to-be-retired back.
"""
user = UserFactory()
orig_username = user.username
hashed_username = get_retired_username_by_username(orig_username)
# Fake username retirement.
user.username = hashed_username
user.save()
# Check to find the user by original username should fail,
# 2nd check by hashed username should succeed.
assert get_potentially_retired_user_by_username_and_hash(orig_username, hashed_username) == user
def test_get_potentially_retired_user_does_not_exist():
"""
Check that the call to get a user with a non-existent
username and hashed username bubbles up User.DoesNotExist
"""
fake_username = "fake username"
hashed_username = get_retired_username_by_username(fake_username)
with pytest.raises(User.DoesNotExist):
get_potentially_retired_user_by_username_and_hash(fake_username, hashed_username)
def test_get_potentially_retired_user_bad_hash():
"""
Check that the call will raise an exeption if the given hash
of the username doesn't match any salted hashes the system
knows about.
"""
fake_username = "fake username"
with pytest.raises(Exception):
get_potentially_retired_user_by_username_and_hash(fake_username, "bad hash")

View File

@@ -1081,6 +1081,14 @@ FERNET_KEYS = AUTH_TOKENS.get('FERNET_KEYS', FERNET_KEYS)
################# Settings for the maintenance banner #################
MAINTENANCE_BANNER_TEXT = ENV_TOKENS.get('MAINTENANCE_BANNER_TEXT', None)
############### Settings for Retirement #####################
RETIRED_USERNAME_FMT = ENV_TOKENS.get('RETIRED_USERNAME_FMT', RETIRED_USERNAME_FMT)
RETIRED_EMAIL_FMT = ENV_TOKENS.get('RETIRED_EMAIL_FMT', RETIRED_EMAIL_FMT)
RETIREMENT_SERVICE_WORKER_USERNAME = ENV_TOKENS.get(
'RETIREMENT_SERVICE_WORKER_USERNAME',
RETIREMENT_SERVICE_WORKER_USERNAME
)
############################### Plugin Settings ###############################
from openedx.core.djangoapps.plugins import plugin_settings, constants as plugin_constants

View File

@@ -3408,6 +3408,12 @@ COMPLETION_BY_VIEWING_DELAY_MS = 5000
RATELIMIT_ENABLE = True
RATELIMIT_RATE = '120/m'
############### Settings for Retirement #####################
RETIRED_USERNAME_FMT = 'retired__user_{}'
RETIRED_EMAIL_FMT = 'retired__user_{}@retired.invalid'
RETIRED_USER_SALTS = ['abc', '123']
RETIREMENT_SERVICE_WORKER_USERNAME = 'RETIREMENT_SERVICE_USER'
############### Settings for django-fernet-fields ##################
FERNET_KEYS = [
'DUMMY KEY CHANGE BEFORE GOING TO PRODUCTION',

View File

@@ -3,6 +3,7 @@ Permissions classes for User accounts API views.
"""
from __future__ import unicode_literals
from django.conf import settings
from rest_framework import permissions
@@ -13,3 +14,16 @@ class CanDeactivateUser(permissions.BasePermission):
"""
def has_permission(self, request, view):
return request.user.has_perm('student.can_deactivate_users')
class CanRetireUser(permissions.BasePermission):
"""
Grants access to the various retirement API endpoints if the requesting user is
a superuser, the RETIREMENT_SERVICE_USERNAME, or has the explicit permission to
retire a User account.
"""
def has_permission(self, request, view):
return (
request.user.username == settings.RETIREMENT_SERVICE_WORKER_USERNAME or
request.user.is_superuser
)

View File

@@ -3,7 +3,7 @@ Tests for User deactivation API permissions
"""
from django.test import TestCase, RequestFactory
from openedx.core.djangoapps.user_api.accounts.permissions import CanDeactivateUser
from openedx.core.djangoapps.user_api.accounts.permissions import CanDeactivateUser, CanRetireUser
from student.tests.factories import ContentTypeFactory, PermissionFactory, SuperuserFactory, UserFactory
@@ -38,3 +38,30 @@ class CanDeactivateUserTest(TestCase):
self.request.user = UserFactory()
result = CanDeactivateUser().has_permission(self.request, None)
self.assertFalse(result)
class CanRetireUserTest(TestCase):
""" Tests for user retirement API permissions """
def setUp(self):
super(CanRetireUserTest, self).setUp()
self.request = RequestFactory().get('/test/url')
def test_api_permission_superuser(self):
self.request.user = SuperuserFactory()
result = CanRetireUser().has_permission(self.request, None)
self.assertTrue(result)
def test_api_permission_user_granted_permission(self):
user = UserFactory()
self.request.user = user
with self.settings(RETIREMENT_SERVICE_WORKER_USERNAME=user.username):
result = CanRetireUser().has_permission(self.request, None)
self.assertTrue(result)
def test_api_permission_user_without_permission(self):
self.request.user = UserFactory()
result = CanRetireUser().has_permission(self.request, None)
self.assertFalse(result)

View File

@@ -92,3 +92,6 @@ SECRET_KEY = 'insecure-secret-key'
TRACK_MAX_EVENT = 50000
USE_TZ = True
RETIREMENT_SERVICE_WORKER_USERNAME = 'RETIREMENT_SERVICE_USER'
RETIRED_USERNAME_PREFIX = 'retired__user_'

View File

@@ -122,6 +122,7 @@ sortedcontainers==0.9.2
stevedore==1.10.0
sure==1.2.3
sympy==0.7.1
user-util==0.1.1
xmltodict==0.4.1
django-ratelimit-backend==1.1.1
unicodecsv==0.14.1