From 04294b04df3eba7433d2323e65a653ac5700a27a Mon Sep 17 00:00:00 2001 From: Alexander Kryklia Date: Fri, 30 Aug 2013 15:46:25 +0300 Subject: [PATCH] Adds oauth checking to acceptance tests --- .../mock_lti_server/mock_lti_server.py | 105 ++++++++++-------- 1 file changed, 59 insertions(+), 46 deletions(-) diff --git a/lms/djangoapps/courseware/mock_lti_server/mock_lti_server.py b/lms/djangoapps/courseware/mock_lti_server/mock_lti_server.py index 8fb1cf4393..2eb2d2e139 100644 --- a/lms/djangoapps/courseware/mock_lti_server/mock_lti_server.py +++ b/lms/djangoapps/courseware/mock_lti_server/mock_lti_server.py @@ -1,15 +1,11 @@ from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler -import json -import urllib import urlparse -import threading - +from requests.packages.oauthlib.oauth1.rfc5849 import signature +import mock from logging import getLogger logger = getLogger(__name__) -# todo - implement oauth - class MockLTIRequestHandler(BaseHTTPRequestHandler): ''' A handler for LTI POST requests. @@ -28,34 +24,41 @@ class MockLTIRequestHandler(BaseHTTPRequestHandler): post_dict = self._post_dict() # Retrieve the POST data - # Log the request logger.debug("LTI provider received POST request {} to path {}".format( str(post_dict), self.path) - ) + ) # Log the request + # Respond only to requests with correct lti endpoint: if self._is_correct_lti_request(): - correct_dict = { - 'user_id': 'default_user_id', - 'oauth_nonce': '22990037033121997701377766132', - 'oauth_timestamp': '1377766132', - 'oauth_consumer_key': 'client_key', - 'lti_version': 'LTI-1p0', - 'oauth_signature_method': 'HMAC-SHA1', - 'oauth_version': '1.0', - 'oauth_signature': 'HGYMAU/G5EMxd0CDOvWubsqxLIY=', - 'lti_message_type': 'basic-lti-launch-request', - 'oauth_callback': 'about:blank' - } + correct_keys = [ + 'user_id', + 'oauth_nonce', + 'oauth_timestamp', + 'oauth_consumer_key', + 'lti_version', + 'oauth_signature_method', + 'oauth_version', + 'oauth_signature', + 'lti_message_type', + 'oauth_callback', + 'lis_outcome_service_url', + 'lis_result_sourcedid', + 'launch_presentation_return_url' + ] - if sorted(correct_dict.keys()) != sorted(post_dict.keys()): - error_message = "Incorrect LTI header" + if sorted(correct_keys) != sorted(post_dict.keys()): + status_message = "Incorrect LTI header" else: - error_message = "This is LTI tool." + params = {k: v for k, v in post_dict.items() if k != 'oauth_signature'} + if self.server.check_oauth_signature(params, post_dict['oauth_signature']): + status_message = "This is LTI tool." + else: + status_message = "Wrong LTI signature" else: - error_message = "Invalid request URL" + status_message = "Invalid request URL" - self._send_response(error_message) + self._send_response(status_message) def _send_head(self): ''' @@ -75,7 +78,7 @@ class MockLTIRequestHandler(BaseHTTPRequestHandler): ''' try: length = int(self.headers.getheader('content-length')) - post_dict = urlparse.parse_qs(self.rfile.read(length)) + post_dict = urlparse.parse_qs(self.rfile.read(length), keep_blank_values=True) # The POST dict will contain a list of values for each key. # None of our parameters are lists, however, so we map [val] --> val. #I f the list contains multiple entries, we pick the first one @@ -110,8 +113,8 @@ class MockLTIRequestHandler(BaseHTTPRequestHandler): self.wfile.write(response_str) def _is_correct_lti_request(self): - '''If url to get LTI is correct.''' - return 'correct_lti_endpoint' in self.path + '''If url to LTI tool is correct.''' + return self.server.oauth_settings['lti_endpoint'] in self.path class MockLTIServer(HTTPServer): @@ -120,22 +123,13 @@ class MockLTIServer(HTTPServer): to POST requests to localhost. ''' - def __init__(self, port_num, oauth={}): + def __init__(self, address): ''' Initialize the mock XQueue server instance. - *port_num* is the localhost port to listen to - - *grade_response_dict* is a dictionary that will be JSON-serialized - and sent in response to XQueue grading requests. + *address* is the (host, host's port to listen to) tuple. ''' - - self.clent_key = oauth.get('client_key', '') - self.clent_secret = oauth.get('client_secret', '') - self.check_oauth() - handler = MockLTIRequestHandler - address = ('', port_num) HTTPServer.__init__(self, address, handler) def shutdown(self): @@ -144,15 +138,34 @@ class MockLTIServer(HTTPServer): ''' # First call superclass shutdown() HTTPServer.shutdown(self) - # We also need to manually close the socket self.socket.close() - def get_oauth_signature(self): - '''test''' - return self._signature + def check_oauth_signature(self, params, client_signature): + ''' + Checks oauth signature from client. - def check_oauth(self): - ''' generate oauth signature ''' - self._signature = '12345' + `params` are params from post request except signature, + `client_signature` is signature from request. + + Builds mocked request and verifies hmac-sha1 signing:: + 1. builds string to sign from `params`, `url` and `http_method`. + 2. signs it with `client_secret` which comes from server settings. + 3. obtains signature after sign and then compares it with request.signature + (request signature comes form client in request) + + Returns `True` if signatures are correct, otherwise `False`. + + ''' + client_secret = unicode(self.oauth_settings['client_secret']) + url = self.oauth_settings['lti_base'] + self.oauth_settings['lti_endpoint'] + + request = mock.Mock() + + request.params = [(unicode(k), unicode(v)) for k, v in params.items()] + request.uri = unicode(url) + request.http_method = u'POST' + request.signature = unicode(client_signature) + + return signature.verify_hmac_sha1(request, client_secret)