Add --update option to create_dot_access (#21172)
This commit is contained in:
@@ -68,72 +68,106 @@ class Command(BaseCommand):
|
||||
dest='scopes',
|
||||
default='',
|
||||
help='Comma-separated list of scopes that this application will be allowed to request.')
|
||||
parser.add_argument('--update',
|
||||
action='store_true',
|
||||
dest='update',
|
||||
help='If application and/or access already exist, update values.')
|
||||
|
||||
def _create_application_access(self, application, scopes):
|
||||
def _create_application(self, user, app_name, application_kwargs):
|
||||
"""
|
||||
If scopes are supplied, creates an oauth_dispatch ApplicationAccess for the provided
|
||||
scopes and DOT application.
|
||||
Create new application with given User, name, and option values.
|
||||
"""
|
||||
if not scopes:
|
||||
return
|
||||
application = Application.objects.create(
|
||||
user=user, name=app_name, **application_kwargs
|
||||
)
|
||||
logger.info('Created {} application with id: {}, client_id: {}, and client_secret: {}'.format(
|
||||
app_name,
|
||||
application.id,
|
||||
application.client_id,
|
||||
application.client_secret,
|
||||
))
|
||||
return application
|
||||
|
||||
if ApplicationAccess.objects.filter(application_id=application.id).exists():
|
||||
def _update_application(self, application, application_kwargs):
|
||||
"""
|
||||
Update given application with option values.
|
||||
"""
|
||||
for key, value in application_kwargs.items():
|
||||
setattr(application, key, value)
|
||||
application.save()
|
||||
logger.info('Updated {} application with id: {}, client_id: {}, and client_secret: {}'.format(
|
||||
application.name,
|
||||
application.id,
|
||||
application.client_id,
|
||||
application.client_secret,
|
||||
))
|
||||
|
||||
def _create_or_update_access(self, application, scopes, update):
|
||||
"""
|
||||
Create application access with specified scopes.
|
||||
|
||||
If application access already exists, then:
|
||||
* Update with specified scopes if update=True,
|
||||
* Otherwise do nothing.
|
||||
"""
|
||||
access = ApplicationAccess.objects.filter(application_id=application.id).first()
|
||||
|
||||
if access and update:
|
||||
access.scopes = scopes
|
||||
access.save()
|
||||
logger.info('Updated application access for {} with scopes: {}'.format(
|
||||
application.name,
|
||||
scopes,
|
||||
))
|
||||
elif access:
|
||||
logger.info('Application access for application {} already exists.'.format(
|
||||
application.name,
|
||||
))
|
||||
return
|
||||
|
||||
application_access = ApplicationAccess.objects.create(
|
||||
application_id=application.id,
|
||||
scopes=scopes,
|
||||
)
|
||||
application_access.save()
|
||||
logger.info('Created application access for {} with scopes: {}'.format(
|
||||
application.name,
|
||||
application_access.scopes,
|
||||
))
|
||||
else:
|
||||
application_access = ApplicationAccess.objects.create(
|
||||
application_id=application.id,
|
||||
scopes=scopes,
|
||||
)
|
||||
logger.info('Created application access for {} with scopes: {}'.format(
|
||||
application.name,
|
||||
application_access.scopes,
|
||||
))
|
||||
|
||||
def handle(self, *args, **options):
|
||||
app_name = options['name']
|
||||
username = options['username']
|
||||
grant_type = options['grant_type']
|
||||
user = User.objects.get(username=username)
|
||||
app_name = options['name']
|
||||
update = options['update']
|
||||
|
||||
redirect_uris = options['redirect_uris']
|
||||
skip_authorization = options['skip_authorization']
|
||||
client_type = Application.CLIENT_PUBLIC if options['public'] else Application.CLIENT_CONFIDENTIAL
|
||||
grant_type = options['grant_type']
|
||||
skip_authorization = options['skip_authorization']
|
||||
client_id = options['client_id']
|
||||
client_secret = options['client_secret']
|
||||
scopes = options['scopes']
|
||||
|
||||
user = User.objects.get(username=username)
|
||||
|
||||
if Application.objects.filter(user=user, name=app_name).exists():
|
||||
logger.info('Application with name {} and user {} already exists.'.format(
|
||||
app_name,
|
||||
username
|
||||
))
|
||||
application = Application.objects.get(user=user, name=app_name)
|
||||
self._create_application_access(application, scopes)
|
||||
return
|
||||
|
||||
create_kwargs = dict(
|
||||
name=app_name,
|
||||
user=user,
|
||||
application_kwargs = dict(
|
||||
redirect_uris=redirect_uris,
|
||||
client_type=client_type,
|
||||
authorization_grant_type=grant_type,
|
||||
skip_authorization=skip_authorization
|
||||
)
|
||||
if client_id:
|
||||
create_kwargs['client_id'] = client_id
|
||||
application_kwargs['client_id'] = client_id
|
||||
if client_secret:
|
||||
create_kwargs['client_secret'] = client_secret
|
||||
application_kwargs['client_secret'] = client_secret
|
||||
|
||||
application = Application.objects.create(**create_kwargs)
|
||||
application.save()
|
||||
logger.info('Created {} application with id: {}, client_id: {}, and client_secret: {}'.format(
|
||||
app_name,
|
||||
application.id,
|
||||
application.client_id,
|
||||
application.client_secret
|
||||
))
|
||||
self._create_application_access(application, scopes)
|
||||
application = Application.objects.filter(user=user, name=app_name).first()
|
||||
if application and update:
|
||||
self._update_application(application, application_kwargs)
|
||||
elif application:
|
||||
logger.info('Application with name {} and user {} already exists.'.format(
|
||||
app_name,
|
||||
username
|
||||
))
|
||||
else:
|
||||
application = self._create_application(user, app_name, application_kwargs)
|
||||
|
||||
scopes = options['scopes']
|
||||
if scopes:
|
||||
self._create_or_update_access(application, scopes, update)
|
||||
|
||||
@@ -29,6 +29,62 @@ class TestCreateDotApplication(TestCase):
|
||||
super(TestCreateDotApplication, self).tearDown()
|
||||
Application.objects.filter(user=self.user).delete()
|
||||
|
||||
def test_update_dot_application(self):
|
||||
APP_NAME = "update_test_application"
|
||||
URI_OLD = "https://example.com/old"
|
||||
URI_NEW = "https://example.com/new"
|
||||
SCOPES_X = ["email", "profile", "user_id"]
|
||||
SCOPES_Y = ["email", "profile"]
|
||||
base_call_args = [
|
||||
APP_NAME,
|
||||
self.user.username,
|
||||
"--update",
|
||||
"--grant-type",
|
||||
Application.GRANT_CLIENT_CREDENTIALS,
|
||||
"--public",
|
||||
"--redirect-uris",
|
||||
]
|
||||
|
||||
# Make sure we can create Application with --update
|
||||
call_args = base_call_args + [URI_OLD]
|
||||
call_command(Command(), *call_args)
|
||||
app = Application.objects.get(name=APP_NAME)
|
||||
self.assertEqual(app.redirect_uris, URI_OLD)
|
||||
with self.assertRaises(ApplicationAccess.DoesNotExist):
|
||||
ApplicationAccess.objects.get(application_id=app.id)
|
||||
|
||||
# Make sure we can call again with no changes
|
||||
call_args = base_call_args + [URI_OLD]
|
||||
call_command(Command(), *call_args)
|
||||
app = Application.objects.get(name=APP_NAME)
|
||||
self.assertEqual(app.redirect_uris, URI_OLD)
|
||||
with self.assertRaises(ApplicationAccess.DoesNotExist):
|
||||
ApplicationAccess.objects.get(application_id=app.id)
|
||||
|
||||
# Make sure calling with new URI changes URI, but does not add access
|
||||
call_args = base_call_args + [URI_NEW]
|
||||
call_command(Command(), *call_args)
|
||||
app = Application.objects.get(name=APP_NAME)
|
||||
self.assertEqual(app.redirect_uris, URI_NEW)
|
||||
with self.assertRaises(ApplicationAccess.DoesNotExist):
|
||||
ApplicationAccess.objects.get(application_id=app.id)
|
||||
|
||||
# Make sure calling with scopes adds access
|
||||
call_args = base_call_args + [URI_NEW, "--scopes", ",".join(SCOPES_X)]
|
||||
call_command(Command(), *call_args)
|
||||
app = Application.objects.get(name=APP_NAME)
|
||||
self.assertEqual(app.redirect_uris, URI_NEW)
|
||||
access = ApplicationAccess.objects.get(application_id=app.id)
|
||||
self.assertEqual(access.scopes, SCOPES_X)
|
||||
|
||||
# Make sure calling with new scopes changes them
|
||||
call_args = base_call_args + [URI_NEW, "--scopes", ",".join(SCOPES_Y)]
|
||||
call_command(Command(), *call_args)
|
||||
app = Application.objects.get(name=APP_NAME)
|
||||
self.assertEqual(app.redirect_uris, URI_NEW)
|
||||
access = ApplicationAccess.objects.get(application_id=app.id)
|
||||
self.assertEqual(access.scopes, SCOPES_Y)
|
||||
|
||||
@ddt.data(
|
||||
(None, None, None, None, False, None),
|
||||
(None, None, 'client-abc', None, False, None),
|
||||
|
||||
Reference in New Issue
Block a user