From c5c9554dbc3e7328d73197e2b81c1c464f74be25 Mon Sep 17 00:00:00 2001 From: Chris Rossi Date: Fri, 20 Dec 2013 11:42:41 -0500 Subject: [PATCH] Test LinkedinAPI --- .../linkedin/management/commands/__init__.py | 89 +++++++------ .../management/commands/linkedin_findusers.py | 2 +- .../management/commands/linkedin_login.py | 2 +- .../management/commands/linkedin_mailusers.py | 2 +- .../management/commands/tests/test_api.py | 123 ++++++++++++++++++ lms/envs/test.py | 3 + 6 files changed, 176 insertions(+), 45 deletions(-) create mode 100644 lms/djangoapps/linkedin/management/commands/tests/test_api.py diff --git a/lms/djangoapps/linkedin/management/commands/__init__.py b/lms/djangoapps/linkedin/management/commands/__init__.py index c62c913f82..0a97a94823 100644 --- a/lms/djangoapps/linkedin/management/commands/__init__.py +++ b/lms/djangoapps/linkedin/management/commands/__init__.py @@ -16,26 +16,28 @@ class LinkedinAPI(object): """ Encapsulates the LinkedIn API. """ - def __init__(self): + def __init__(self, command): config = getattr(settings, "LINKEDIN_API", None) if not config: raise CommandError("LINKEDIN_API is not configured") self.config = config try: - self.tokens = LinkedInToken.objects.get() + self.token = LinkedInToken.objects.get() except LinkedInToken.DoesNotExist: - self.tokens = None + self.token = None + self.command = command self.state = str(uuid.uuid4()) def http_error(self, error, message): """ Handle an unexpected HTTP response. """ - print "!!ERROR!!" - print error - print error.read() + stderr = self.command.stderr + stderr.write("!!ERROR!!") + stderr.write(error) + stderr.write(error.read()) raise CommandError(message) def authorization_url(self): @@ -57,53 +59,56 @@ class LinkedinAPI(object): assert query['state'][0] == self.state, (query['state'][0], self.state) return query['code'][0] + def access_token_url(self, code): + config = self.config + return ("https://www.linkedin.com/uas/oauth2/accessToken" + "?grant_type=authorization_code" + "&code=%s&redirect_uri=%s&client_id=%s&client_secret=%s" % ( + code, config['REDIRECT_URI'], config['CLIENT_ID'], + config['CLIENT_SECRET'])) + + def call_json_api(self, url): + try: + request = urllib2.Request(url, headers={'x-li-format': 'json'}) + response = urllib2.urlopen(request).read() + return json.loads(response) + except urllib2.HTTPError, error: + self.http_error(error, "Error calling LinkedIn API") + def get_access_token(self, code): """ Given an authorization code, get an access token. """ - config = self.config - url = ("https://www.linkedin.com/uas/oauth2/accessToken" - "?grant_type=authorization_code" - "&code=%s&redirect_uri=%s&client_id=%s&client_secret=%s" % ( - code, config['REDIRECT_URI'], config['CLIENT_ID'], - config['CLIENT_SECRET'])) - + response = self.call_json_api(self.access_token_url(code)) + access_token = response['access_token'] try: - response = urllib2.urlopen(url).read() - except urllib2.HTTPError, error: - self.http_error(error, "Unable to retrieve access token") - - access_token = json.loads(response)['access_token'] - try: - tokens = LinkedInToken.objects.get() - tokens.access_token = access_token - tokens.authorization_code = code + token = LinkedInToken.objects.get() + token.access_token = access_token except LinkedInToken.DoesNotExist: - tokens = LinkedInToken(access_token=access_token) - tokens.save() - self.tokens = tokens + token = LinkedInToken(access_token=access_token) + token.save() + self.token = token return access_token + def require_token(self): + if self.token is None: + raise CommandError( + "You must log in to LinkedIn in order to use this script. " + "Please use the 'login' command to log in to LinkedIn.") + + def batch_url(self, emails): + self.require_token() + queries = ','.join(("email=" + email for email in emails)) + url = "https://api.linkedin.com/v1/people::(%s):(id)" % queries + url += "?oauth2_access_token=%s" % self.token.access_token + return url + def batch(self, emails): """ Get the LinkedIn status for a batch of emails. """ - if self.tokens is None: - raise CommandError( - "You must log in to LinkedIn in order to use this script. " - "Please use the 'login' command to log in to LinkedIn.") - emails = list(emails) # realize generator since we traverse twice - queries = ','.join(("email=" + email for email in emails)) - url = "https://api.linkedin.com/v1/people::(%s):(id)" % queries - url += "?oauth2_access_token=%s" % self.tokens.access_token - request = urllib2.Request(url, headers={'x-li-format': 'json'}) - try: - response = urllib2.urlopen(request).read() - values = json.loads(response)['values'] - accounts = set(value['_key'][6:] for value in values) - return (email in accounts for email in emails) - except urllib2.HTTPError, error: - self.http_error(error, "Unable to access People API") - return (True for email in emails) + response = self.call_json_api(self.batch_url(emails)) + accounts = set(value['_key'][6:] for value in response['values']) + return (email in accounts for email in emails) diff --git a/lms/djangoapps/linkedin/management/commands/linkedin_findusers.py b/lms/djangoapps/linkedin/management/commands/linkedin_findusers.py index 2f2a14ffee..520b89168e 100644 --- a/lms/djangoapps/linkedin/management/commands/linkedin_findusers.py +++ b/lms/djangoapps/linkedin/management/commands/linkedin_findusers.py @@ -75,7 +75,7 @@ class Command(BaseCommand): """ Check users. """ - api = LinkedinAPI() + api = LinkedinAPI(self) recheck = options.pop('recheck', False) force = options.pop('force', False) if force: diff --git a/lms/djangoapps/linkedin/management/commands/linkedin_login.py b/lms/djangoapps/linkedin/management/commands/linkedin_login.py index 7f835892cb..b2df0112c3 100644 --- a/lms/djangoapps/linkedin/management/commands/linkedin_login.py +++ b/lms/djangoapps/linkedin/management/commands/linkedin_login.py @@ -19,7 +19,7 @@ class Command(BaseCommand): def handle(self, *args, **options): """ """ - api = LinkedinAPI() + api = LinkedinAPI(self) print "Let's log into your LinkedIn account." print "Start by visiting this url:" print api.authorization_url() diff --git a/lms/djangoapps/linkedin/management/commands/linkedin_mailusers.py b/lms/djangoapps/linkedin/management/commands/linkedin_mailusers.py index 67a98e4344..b02fafeabe 100644 --- a/lms/djangoapps/linkedin/management/commands/linkedin_mailusers.py +++ b/lms/djangoapps/linkedin/management/commands/linkedin_mailusers.py @@ -43,7 +43,7 @@ class Command(BaseCommand): def __init__(self): super(BaseCommand, self).__init__() - self.api = LinkedinAPI() + self.api = LinkedinAPI(self) def handle(self, *args, **options): whitelist = self.api.config.get('EMAIL_WHITELIST') diff --git a/lms/djangoapps/linkedin/management/commands/tests/test_api.py b/lms/djangoapps/linkedin/management/commands/tests/test_api.py new file mode 100644 index 0000000000..8dc2915a48 --- /dev/null +++ b/lms/djangoapps/linkedin/management/commands/tests/test_api.py @@ -0,0 +1,123 @@ +import mock +import StringIO + +from django.core.management.base import CommandError +from django.test import TestCase + +from linkedin.management.commands import LinkedinAPI +from linkedin.models import LinkedInToken + + +class LinkedinAPITests(TestCase): + + def setUp(self): + patcher = mock.patch('linkedin.management.commands.uuid.uuid4') + uuid4 = patcher.start() + uuid4.return_value = '0000-0000' + self.addCleanup(patcher.stop) + + def make_one(self): + return LinkedinAPI(DummyCommand()) + + @mock.patch('django.conf.settings.LINKEDIN_API', None) + def test_ctor_no_api_config(self): + with self.assertRaises(CommandError): + self.make_one() + + def test_ctor_no_token(self): + api = self.make_one() + self.assertEqual(api.token, None) + + def test_ctor_with_token(self): + token = LinkedInToken() + token.save() + api = self.make_one() + self.assertEqual(api.token, token) + + def test_http_error(self): + api = self.make_one() + with self.assertRaises(CommandError): + api.http_error(DummyHTTPError(), "That didn't work") + self.assertEqual( + api.command.stderr.getvalue(), + "!!ERROR!!" + "HTTPError OMG!" + "OMG OHNOES!") + + def test_authorization_url(self): + api = self.make_one() + self.assertEqual( + api.authorization_url(), + 'https://www.linkedin.com/uas/oauth2/authorization?' + 'response_type=code&client_id=12345&state=0000-0000&' + 'redirect_uri=http://bar.foo') + + def test_get_authorization_code(self): + fut = self.make_one().get_authorization_code + self.assertEqual( + fut('http://foo.bar/?state=0000-0000&code=54321'), '54321') + + def test_access_token_url(self): + fut = self.make_one().access_token_url + self.assertEqual( + fut('54321'), + 'https://www.linkedin.com/uas/oauth2/accessToken?' + 'grant_type=authorization_code&code=54321&' + 'redirect_uri=http://bar.foo&client_id=12345&client_secret=SECRET') + + def test_get_access_token(self): + api = self.make_one() + api.call_json_api = mock.Mock(return_value={'access_token': '777'}) + self.assertEqual(api.get_access_token('54321'), '777') + token = LinkedInToken.objects.get() + self.assertEqual(token.access_token, '777') + + def test_get_access_token_overwrite_previous(self): + LinkedInToken(access_token='888').save() + api = self.make_one() + api.call_json_api = mock.Mock(return_value={'access_token': '777'}) + self.assertEqual(api.get_access_token('54321'), '777') + token = LinkedInToken.objects.get() + self.assertEqual(token.access_token, '777') + + def test_require_token_no_token(self): + fut = self.make_one().require_token + with self.assertRaises(CommandError): + fut() + + def test_require_token(self): + LinkedInToken().save() + fut = self.make_one().require_token + fut() + + def test_batch_url(self): + LinkedInToken(access_token='777').save() + fut = self.make_one().batch_url + emails = ['foo@bar', 'bar@foo'] + self.assertEquals( + fut(emails), + 'https://api.linkedin.com/v1/people::(email=foo@bar,email=bar@foo):' + '(id)?oauth2_access_token=777') + + def test_batch(self): + LinkedInToken(access_token='777').save() + api = self.make_one() + api.call_json_api = mock.Mock(return_value={ + 'values': [{'_key': 'email=bar@foo'}]}) + emails = ['foo@bar', 'bar@foo'] + self.assertEqual(list(api.batch(emails)), [False, True]) + + +class DummyCommand(object): + + def __init__(self): + self.stderr = StringIO.StringIO() + + +class DummyHTTPError(object): + + def __str__(self): + return 'HTTPError OMG!' + + def read(self): + return 'OMG OHNOES!' diff --git a/lms/envs/test.py b/lms/envs/test.py index e718cc3f71..3ec4aaa158 100644 --- a/lms/envs/test.py +++ b/lms/envs/test.py @@ -260,6 +260,9 @@ LTI_PORT = 8765 ############################ LinkedIn Integration ############################# INSTALLED_APPS += ('linkedin',) LINKEDIN_API = { + 'CLIENT_ID': '12345', + 'CLIENT_SECRET': 'SECRET', + 'REDIRECT_URI': 'http://bar.foo', 'COMPANY_NAME': 'edX', 'COMPANY_ID': '0000000', 'EMAIL_FROM': 'The Team ',