diff --git a/openedx/core/djangoapps/credit/api/provider.py b/openedx/core/djangoapps/credit/api/provider.py index 38cbbd9942..e19fcfa3d3 100644 --- a/openedx/core/djangoapps/credit/api/provider.py +++ b/openedx/core/djangoapps/credit/api/provider.py @@ -112,6 +112,28 @@ def get_credit_provider_info(request, provider_id): # pylint: disable=unused-ar return JsonResponse(credit_provider_data) +def check_keys_exist(shared_secret_key, provider_id): + """ + Verify that a key is available for single or multiple key support scenarios. + + Raise CreditProviderNotConfigured if no key available. + """ + # Accounts for old way of storing provider key + if shared_secret_key is None: + msg = u'Credit provider with ID "{provider_id}" does not have a secret key configured.'.format( + provider_id=provider_id + ) + log.error(msg) + raise CreditProviderNotConfigured(msg) + + # Accounts for new way of storing provider key + elif isinstance(shared_secret_key, list) and not any(shared_secret_key): + msg = 'Could not retrieve secret key for credit provider [{}]. ' \ + 'Unable to validate requests from provider.'.format(provider_id) + log.error(msg) + raise CreditProviderNotConfigured(msg) + + @transaction.atomic def create_credit_request(course_key, provider_id, username): """ @@ -212,12 +234,12 @@ def create_credit_request(course_key, provider_id, username): # That way, if there's a misconfiguration, we won't have requests # in our system that we know weren't sent to the provider. shared_secret_key = get_shared_secret_key(credit_provider.provider_id) - if shared_secret_key is None: - msg = u'Credit provider with ID "{provider_id}" does not have a secret key configured.'.format( - provider_id=credit_provider.provider_id - ) - log.error(msg) - raise CreditProviderNotConfigured(msg) + check_keys_exist(shared_secret_key, credit_provider.provider_id) + + if isinstance(shared_secret_key, list): + # if keys exist, and keys are stored as a list + # then we know at least 1 is available for [0] + shared_secret_key = [key for key in shared_secret_key if key][0] # Initiate a new request if one has not already been created credit_request, created = CreditRequest.objects.get_or_create( diff --git a/openedx/core/djangoapps/credit/serializers.py b/openedx/core/djangoapps/credit/serializers.py index 277591bfda..2ff76a31b0 100644 --- a/openedx/core/djangoapps/credit/serializers.py +++ b/openedx/core/djangoapps/credit/serializers.py @@ -87,20 +87,62 @@ class CreditProviderCallbackSerializer(serializers.Serializer): # pylint:disabl return value - def validate_signature(self, value): - """ Validate the signature and ensure the provider is setup properly. """ - provider_id = self.provider.provider_id - secret_key = get_shared_secret_key(provider_id) + def _check_keys_exist_for_provider(self, secret_key, provider_id): + """ + Verify there are keys available in the secret to + verify signature against. + + Throw error if none are available. + """ + + # Accounts for old way of storing provider key if secret_key is None: msg = 'Could not retrieve secret key for credit provider [{}]. ' \ 'Unable to validate requests from provider.'.format(provider_id) log.error(msg) raise PermissionDenied(msg) + # Accounts for new way of storing provider key + # We need at least 1 key here that we can use to validate the signature + if isinstance(secret_key, list) and not any(secret_key): + msg = 'Could not retrieve secret key for credit provider [{}]. ' \ + 'Unable to validate requests from provider.'.format(provider_id) + log.error(msg) + raise PermissionDenied(msg) + + def _compare_signatures(self, secret_key, provider_id): + """ + Compare signature we received with the signature we expect/have. + + Throw an error if they don't match. + """ + data = self.initial_data actual_signature = data["signature"] - if signature(data, secret_key) != actual_signature: + + # Accounts for old way of storing provider key + if isinstance(secret_key, six.text_type) and signature(data, secret_key) != actual_signature: msg = 'Request from credit provider [{}] had an invalid signature.'.format(provider_id) raise PermissionDenied(msg) + # Accounts for new way of storing provider key + if isinstance(secret_key, list): + # Received value just needs to match one of the keys we have + key_match = False + for secretvalue in secret_key: + if signature(data, secretvalue) == actual_signature: + key_match = True + + if not key_match: + msg = 'Request from credit provider [{}] had an invalid signature.'.format(provider_id) + raise PermissionDenied(msg) + + def validate_signature(self, value): + """ Validate the signature and ensure the provider is setup properly. """ + provider_id = self.provider.provider_id + secret_key = get_shared_secret_key(provider_id) + + self._check_keys_exist_for_provider(secret_key, provider_id) + self._compare_signatures(secret_key, provider_id) + return value diff --git a/openedx/core/djangoapps/credit/signature.py b/openedx/core/djangoapps/credit/signature.py index 16f23fb93a..0fb3919ebe 100644 --- a/openedx/core/djangoapps/credit/signature.py +++ b/openedx/core/djangoapps/credit/signature.py @@ -27,18 +27,41 @@ from django.conf import settings log = logging.getLogger(__name__) +def _encode_secret(secret, provider_id): + """ + Helper function for encoding text_type secrets into ascii. + """ + try: + secret.encode('ascii') + except UnicodeEncodeError: + secret = None + log.error(u'Shared secret key for credit provider "%s" contains non-ASCII unicode.', provider_id) + + return secret + + def get_shared_secret_key(provider_id): """ - Retrieve the shared secret key for a particular credit provider. + Retrieve the shared secret for a particular credit provider. + + It is possible for the secret to be stored in 2 ways: + 1 - a key/value pair of provider_id and secret string + {'cool_school': '123abc'} + 2 - a key/value pair of provider_id and secret list + {'cool_school': ['987zyx', '123abc']} """ + secret = getattr(settings, "CREDIT_PROVIDER_SECRET_KEYS", {}).get(provider_id) + # When secret is just characters if isinstance(secret, six.text_type): - try: - secret.encode('ascii') - except UnicodeEncodeError: - secret = None - log.error(u'Shared secret key for credit provider "%s" contains non-ASCII unicode.', provider_id) + secret = _encode_secret(secret, provider_id) + + # When secret is a list containing multiple keys, encode all of them + elif isinstance(secret, list): + for index, secretvalue in enumerate(secret): + if isinstance(secretvalue, six.text_type): + secret[index] = _encode_secret(secretvalue, provider_id) return secret diff --git a/openedx/core/djangoapps/credit/tests/test_api.py b/openedx/core/djangoapps/credit/tests/test_api.py index 8ff26e3a49..69bc5f71a1 100644 --- a/openedx/core/djangoapps/credit/tests/test_api.py +++ b/openedx/core/djangoapps/credit/tests/test_api.py @@ -46,12 +46,13 @@ from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase from xmodule.modulestore.tests.factories import CourseFactory TEST_CREDIT_PROVIDER_SECRET_KEY = "931433d583c84ca7ba41784bad3232e6" +TEST_CREDIT_PROVIDER_SECRET_KEY_TWO = "abcf433d583c8baebae1784bad3232e6" TEST_ECOMMERCE_WORKER = 'test_worker' @override_settings(CREDIT_PROVIDER_SECRET_KEYS={ "hogwarts": TEST_CREDIT_PROVIDER_SECRET_KEY, - "ASU": TEST_CREDIT_PROVIDER_SECRET_KEY, + "ASU": [TEST_CREDIT_PROVIDER_SECRET_KEY_TWO, TEST_CREDIT_PROVIDER_SECRET_KEY], "MIT": TEST_CREDIT_PROVIDER_SECRET_KEY }) class CreditApiTestBase(ModuleStoreTestCase): diff --git a/openedx/core/djangoapps/credit/tests/test_serializers.py b/openedx/core/djangoapps/credit/tests/test_serializers.py index a464342c5f..9c94037983 100644 --- a/openedx/core/djangoapps/credit/tests/test_serializers.py +++ b/openedx/core/djangoapps/credit/tests/test_serializers.py @@ -3,8 +3,10 @@ import six from django.test import TestCase +from django.test.utils import override_settings +from rest_framework.exceptions import PermissionDenied -from openedx.core.djangoapps.credit import serializers +from openedx.core.djangoapps.credit import serializers, signature from openedx.core.djangoapps.credit.tests.factories import CreditEligibilityFactory, CreditProviderFactory from student.tests.factories import UserFactory @@ -43,3 +45,84 @@ class CreditEligibilitySerializerTests(TestCase): 'username': user.username, } self.assertDictEqual(serializer.data, expected) + + +class CreditProviderCallbackSerializerTests(TestCase): + """ CreditProviderCallbackSerializer tests. """ + + def test_check_keys_exist_for_provider_string(self): + """ Verify _check_keys_exist_for_provider errors if key is None """ + + secret_key = None + provider_id = 'asu' + + serializer = serializers.CreditProviderCallbackSerializer() + with self.assertRaises(PermissionDenied): + serializer._check_keys_exist_for_provider(secret_key, provider_id) + + def test_check_keys_exist_for_provider_list_no_keys(self): + """ + Verify _check_keys_exist_for_provider errors if no keys in list + are truthy. (This accounts for there being 2 keys present both + of which are None due to ascii encode issues) + """ + + secret_key = [None, None] + provider_id = 'asu' + + serializer = serializers.CreditProviderCallbackSerializer() + with self.assertRaises(PermissionDenied): + serializer._check_keys_exist_for_provider(secret_key, provider_id) + + def test_check_keys_exist_for_provider_list_with_key_present(self): + """ + Verify _check_keys_exist_for_provider does not error when at least + 1 key in config is a valid key + """ + + secret_key = [None, 'abc1234', None] + provider_id = 'asu' + + serializer = serializers.CreditProviderCallbackSerializer() + result = serializer._check_keys_exist_for_provider(secret_key, provider_id) + # No return value, so we expect successful execution to return None + assert result is None + + def test_compare_signatures_string_key(self): + """ Verify compare_signature errors if string key does not match. """ + provider = CreditProviderFactory( + provider_id='asu', + active=False, + ) + + # Create a serializer that has a signature which was created with a key + # that we do not have in our system. + sig = signature.signature({}, 'iamthewrongkey') + serializer = serializers.CreditProviderCallbackSerializer( + data={'signature': sig} + ) + with self.assertRaises(PermissionDenied): + # The first arg here is key we have (that doesn't match the sig) + serializer._compare_signatures('abcd1234', provider.provider_id) + + def test_compare_signatures_list_key(self): + """ + Verify compare_signature errors if no keys that are stored in the list + in config match the key handed in the signature. + """ + provider = CreditProviderFactory( + provider_id='asu', + active=False, + ) + + sig = signature.signature({}, 'iamthewrongkey') + serializer = serializers.CreditProviderCallbackSerializer( + data={'signature': sig} + ) + + with self.assertRaises(PermissionDenied): + # The first arg here is the list of keys he have (that dont matcht the sig) + serializer._compare_signatures( + ['abcd1234', 'xyz789'], + provider.provider_id + ) diff --git a/openedx/core/djangoapps/credit/tests/test_signature.py b/openedx/core/djangoapps/credit/tests/test_signature.py index 5d40963bff..9a431f87de 100644 --- a/openedx/core/djangoapps/credit/tests/test_signature.py +++ b/openedx/core/djangoapps/credit/tests/test_signature.py @@ -42,3 +42,37 @@ class SignatureTest(TestCase): key = signature.get_shared_secret_key("asu") sig = signature.signature({'name': u'Ed XavĂ­er'}, key) self.assertEqual(sig, "76b6c9a657000829253d7c23977b35b34ad750c5681b524d7fdfb25cd5273cec") + + @override_settings(CREDIT_PROVIDER_SECRET_KEYS={ + "asu": 'abcd1234', + }) + def test_get_shared_secret_key_string(self): + """ + get_shared_secret_key should return ascii encoded string if provider + secret is stored as a single key. + """ + key = signature.get_shared_secret_key("asu") + self.assertEqual(key, 'abcd1234') + + @override_settings(CREDIT_PROVIDER_SECRET_KEYS={ + "asu": ['abcd1234', 'zyxw9876'] + }) + def test_get_shared_secret_key_string_multiple_keys(self): + """ + get_shared_secret_key should return ascii encoded strings if provider + secret is stored as a list for multiple key support. + """ + key = signature.get_shared_secret_key("asu") + self.assertEqual(key, ['abcd1234', 'zyxw9876']) + + @override_settings(CREDIT_PROVIDER_SECRET_KEYS={ + "asu": [u'\u4567', u'zyxw9876'] + }) + def test_get_shared_secret_key_string_multiple_keys_with_none(self): + """ + get_shared_secret_key should return ascii encoded string if provider + secret is stored as a list for multiple key support, replacing None + for unencodable strings. + """ + key = signature.get_shared_secret_key("asu") + self.assertEqual(key, [None, 'zyxw9876']) diff --git a/openedx/core/djangoapps/credit/tests/test_views.py b/openedx/core/djangoapps/credit/tests/test_views.py index db82d42a7b..dffbad0b47 100644 --- a/openedx/core/djangoapps/credit/tests/test_views.py +++ b/openedx/core/djangoapps/credit/tests/test_views.py @@ -367,36 +367,44 @@ class CreditProviderRequestCreateViewTests(ApiTestCaseMixin, UserMixin, TestCase ) secret_key = 'secret' - with override_settings(CREDIT_PROVIDER_SECRET_KEYS={self.provider.provider_id: secret_key}): - response = self.post_credit_request(username, course_key) - self.assertEqual(response.status_code, 200) + # Provider keys can be stored as a string or list of strings + secret_key_with_key_as_string = {self.provider.provider_id: secret_key} + # The None represents a key that was not ascii encodable + secret_key_with_key_as_list = { + self.provider.provider_id: [secret_key, None] + } - # Check that the user's request status is pending - request = CreditRequest.objects.get(username=username, course__course_key=course_key) - self.assertEqual(request.status, 'pending') + for secret_key_dict in [secret_key_with_key_as_string, secret_key_with_key_as_list]: + with override_settings(CREDIT_PROVIDER_SECRET_KEYS=secret_key_dict): + response = self.post_credit_request(username, course_key) + self.assertEqual(response.status_code, 200) - # Check request parameters - content = json.loads(response.content.decode('utf-8')) - parameters = content['parameters'] + # Check that the user's request status is pending + request = CreditRequest.objects.get(username=username, course__course_key=course_key) + self.assertEqual(request.status, 'pending') - self.assertEqual(content['url'], self.provider.provider_url) - self.assertEqual(content['method'], 'POST') - self.assertEqual(len(parameters['request_uuid']), 32) - self.assertEqual(parameters['course_org'], course_key.org) - self.assertEqual(parameters['course_num'], course_key.course) - self.assertEqual(parameters['course_run'], course_key.run) - self.assertEqual(parameters['final_grade'], six.text_type(final_grade)) - self.assertEqual(parameters['user_username'], username) - self.assertEqual(parameters['user_full_name'], self.user.get_full_name()) - self.assertEqual(parameters['user_mailing_address'], '') - self.assertEqual(parameters['user_country'], '') + # Check request parameters + content = json.loads(response.content.decode('utf-8')) + parameters = content['parameters'] - # The signature is going to change each test run because the request - # is assigned a different UUID each time. - # For this reason, we use the signature function directly - # (the "signature" parameter will be ignored when calculating the signature). - # Other unit tests verify that the signature function is working correctly. - self.assertEqual(parameters['signature'], signature(parameters, secret_key)) + self.assertEqual(content['url'], self.provider.provider_url) + self.assertEqual(content['method'], 'POST') + self.assertEqual(len(parameters['request_uuid']), 32) + self.assertEqual(parameters['course_org'], course_key.org) + self.assertEqual(parameters['course_num'], course_key.course) + self.assertEqual(parameters['course_run'], course_key.run) + self.assertEqual(parameters['final_grade'], six.text_type(final_grade)) + self.assertEqual(parameters['user_username'], username) + self.assertEqual(parameters['user_full_name'], self.user.get_full_name()) + self.assertEqual(parameters['user_mailing_address'], '') + self.assertEqual(parameters['user_country'], '') + + # The signature is going to change each test run because the request + # is assigned a different UUID each time. + # For this reason, we use the signature function directly + # (the "signature" parameter will be ignored when calculating the signature). + # Other unit tests verify that the signature function is working correctly. + self.assertEqual(parameters['signature'], signature(parameters, secret_key)) def test_post_invalid_provider(self): """ Verify the endpoint returns HTTP 404 if the credit provider is not valid. """ @@ -465,6 +473,21 @@ class CreditProviderRequestCreateViewTests(ApiTestCaseMixin, UserMixin, TestCase response = self.post_credit_request(self.user.username, self.eligibility.course.course_key) self.assertEqual(response.status_code, 400) + def test_post_secret_key_not_set_key_as_list(self): + """ Verify the endpoint returns HTTP 400 if we attempt to create a + request for a provider with no secret key set with keys set as list. """ + # Enable provider integration + self.provider.enable_integration = True + self.provider.save() + + # Cannot initiate a request because we cannot sign it + secret_key_with_key_as_list = { + self.provider.provider_id: [] + } + with override_settings(CREDIT_PROVIDER_SECRET_KEYS=secret_key_with_key_as_list): + response = self.post_credit_request(self.user.username, self.eligibility.course.course_key) + self.assertEqual(response.status_code, 400) + @ddt.ddt @skip_unless_lms