diff --git a/openedx/core/djangolib/oauth2_retirement_utils.py b/openedx/core/djangolib/oauth2_retirement_utils.py new file mode 100644 index 0000000000..2d02b6964d --- /dev/null +++ b/openedx/core/djangolib/oauth2_retirement_utils.py @@ -0,0 +1,52 @@ +""" +Removes user PII from OAuth2 models. +""" + +from oauth2_provider.models import ( + AccessToken as DOTAccessToken, + Application as DOTApplication, + Grant as DOTGrant, + RefreshToken as DOTRefreshToken, +) +from provider.oauth2.models import ( + AccessToken as DOPAccessToken, + RefreshToken as DOPRefreshToken, + Grant as DOPGrant, +) + + +class ModelRetirer(object): + """ + Given a list of model names, provides methods for deleting instances of + those models. + """ + + def __init__(self, models_to_retire): + self._models_to_retire = models_to_retire + + def retire_user_by_id(self, user_id): + for model in self._models_to_retire: + self._delete_user_id_from(model=model, user_id=user_id) + + @staticmethod + def _delete_user_id_from(model, user_id): + """ + Deletes a user from a model by their user id. + """ + user_query_results = model.objects.filter(user_id=user_id) + + if not user_query_results.exists(): + return False + + user_query_results.delete() + return True + + +def retire_dot_oauth2_models(user): + dot_models = [DOTAccessToken, DOTApplication, DOTGrant, DOTRefreshToken] + ModelRetirer(dot_models).retire_user_by_id(user.id) + + +def retire_dop_oauth2_models(user): + dop_models = [DOPAccessToken, DOPGrant, DOPRefreshToken] + ModelRetirer(dop_models).retire_user_by_id(user.id) diff --git a/openedx/core/djangolib/tests/test_oauth2_retirement_utils.py b/openedx/core/djangolib/tests/test_oauth2_retirement_utils.py new file mode 100644 index 0000000000..fcc13ce344 --- /dev/null +++ b/openedx/core/djangolib/tests/test_oauth2_retirement_utils.py @@ -0,0 +1,89 @@ +""" +Contains tests for OAuth2 model-retirement methods. +""" + +import datetime + +from django.test import TestCase +from openedx.core.djangoapps.oauth_dispatch.tests import factories +from student.tests.factories import UserFactory +from oauth2_provider.models import ( + AccessToken as DOTAccessToken, + Application as DOTApplication, + RefreshToken as DOTRefreshToken, + Grant as DOTGrant, +) +from provider.oauth2.models import ( + AccessToken as DOPAccessToken, + RefreshToken as DOPRefreshToken, + Grant as DOPGrant, + Client as DOPClient, +) + +from ..oauth2_retirement_utils import ( + retire_dop_oauth2_models, + retire_dot_oauth2_models, +) + + +class RetireDOTModelsTest(TestCase): + + def test_delete_dot_models(self): + user = UserFactory.create() + app = factories.ApplicationFactory(user=user) + access_token = factories.AccessTokenFactory( + user=user, + application=app + ) + factories.RefreshTokenFactory( + user=user, + application=app, + access_token=access_token, + ) + DOTGrant.objects.create( + user=user, + application=app, + expires=datetime.datetime(2018, 1, 1), + ) + + retire_dot_oauth2_models(user) + + applications = DOTApplication.objects.filter(user_id=user.id) + access_tokens = DOTAccessToken.objects.filter(user_id=user.id) + refresh_tokens = DOTRefreshToken.objects.filter(user_id=user.id) + grants = DOTGrant.objects.filter(user=user) + + query_sets = [applications, access_tokens, refresh_tokens, grants] + + for query_set in query_sets: + self.assertFalse(query_set.exists()) + + +class RetireDOPModelsTest(TestCase): + + def test_delete_dop_models(self): + user = UserFactory.create() + client = DOPClient.objects.create( + user=user, + client_type=1, + ) + access_token = DOPAccessToken.objects.create( + user=user, + client=client, + ) + DOPRefreshToken.objects.create( + user=user, + client=client, + access_token=access_token, + ) + + retire_dop_oauth2_models(user) + + access_tokens = DOPAccessToken.objects.filter(user=user) + refresh_tokens = DOPRefreshToken.objects.filter(user=user) + grants = DOPGrant.objects.filter(user_id=user.id) + + query_sets = [access_tokens, refresh_tokens, grants] + + for query_set in query_sets: + self.assertFalse(query_set.exists())