diff --git a/openedx/core/djangoapps/xblock/tests/__init__.py b/openedx/core/djangoapps/xblock/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/openedx/core/djangoapps/xblock/tests/test_utils.py b/openedx/core/djangoapps/xblock/tests/test_utils.py new file mode 100644 index 0000000000..a3510d965a --- /dev/null +++ b/openedx/core/djangoapps/xblock/tests/test_utils.py @@ -0,0 +1,119 @@ +""" +Tests for xblock utils. +""" +import datetime + +import pytest +from django.test import override_settings +from freezegun import freeze_time + +from openedx.core.djangoapps.xblock.utils import ( + get_secure_token_for_xblock_handler, + validate_secure_token_for_xblock_handler +) + +REFERENCE_PARAMS = { + "generation_user_id": "12344", + "validation_user_id": "12344", + "generation_block_key_str": "some key", + "validation_block_key_str": "some key", + "generation_secret_key": "baseline_key", + "validation_secret_key": "baseline_key", + "generation_xblock_handler_token_keys": None, + "validation_xblock_handler_token_keys": None, + # A reference time that produces a token that will expire within the next day + # but not for a few hours. + "reference_time": datetime.datetime(2021, 1, 28, 13, 26, 38, 787309, datetime.timezone.utc), + "validation_time_delta_s": 0, +} + + +@pytest.mark.parametrize( + "param_delta,expected_validation", + [ + # Happy Path case with the above REFERENCE_PARAMS. + ({}, True), + # Ensure token still valid in 1 hour + ({"validation_time_delta_s": 3600}, True), + # Ensure token still valid in 1 day, this should be true but isn't due + # to the bug that will be fixed in ARCHBOM-1677 + ({"validation_time_delta_s": 86400}, False), + # Ensure token is invalid after 5 days + ({"validation_time_delta_s": 86400 * 5}, False), + # Ensure token is not valid 5 days before generation + # In the case where the validating server is really skewed + # from the generating server. + ({"validation_time_delta_s": 86400 * -5}, False), + # Different user tries to use your token. + ({"validation_user_id": 54321}, False), + # Access a different block. + ({"validation_block_key_str": "some other block"}, False), + # If key is changed, token shouldn't be valid. + ({"validation_secret_key": "new secret key"}, False), + # Test with token keys setting overrides that don't have anything to do with the secret key. + ({"generation_xblock_handler_token_keys": [], "validation_xblock_handler_token_keys": []}, True), + # Test migration from secret key to keys list. The secret_key has changed + # but the old secret key is in the token keys list. + ( + { + "validation_xblock_handler_token_keys": ["baseline_key", ], + "validation_secret_key": "new secret key", + }, + True, + ), + # Test tokens generated with the old key will be valid when there is a new primary + # token key at validation time. + ( + { + "generation_xblock_handler_token_keys": ["baseline_key", ], + "validation_xblock_handler_token_keys": ["new token key", "baseline_key", ], + "validation_secret_key": "new secret key", + }, + True, + ), + # Test token generated with the new key is valid with the new key even if the secret key + # changes. + ( + { + "generation_xblock_handler_token_keys": ["new token key"], + "validation_xblock_handler_token_keys": ["new token key"], + "validation_secret_key": "new secret key", # Ensure that we're not matching with secret key + }, + True, + ), + # Test that if the new token key is used to generate, that validating with the SECRET_KEY won't work. + ( + { + "generation_xblock_handler_token_keys": ["new token key"], + "validation_xblock_handler_token_keys": [], + }, + False, + ), + ], +) +def test_secure_token(param_delta: dict, expected_validation: bool): + params = {} + params.update(REFERENCE_PARAMS) + params.update(param_delta) + reference_time = params['reference_time'] + + with override_settings( + SECRET_KEY=params["generation_secret_key"], + XBLOCK_HANDLER_TOKEN_KEYS=params["generation_xblock_handler_token_keys"], + ): + with freeze_time(reference_time): + token = get_secure_token_for_xblock_handler( + params["generation_user_id"], params["generation_block_key_str"] + ) + + with override_settings( + SECRET_KEY=params["validation_secret_key"], + XBLOCK_HANDLER_TOKEN_KEYS=params["validation_xblock_handler_token_keys"], + ): + with freeze_time(reference_time + datetime.timedelta(seconds=params["validation_time_delta_s"])): + assert ( + validate_secure_token_for_xblock_handler( + params["validation_user_id"], params["validation_block_key_str"], token + ) + == expected_validation + ) diff --git a/openedx/core/djangoapps/xblock/utils.py b/openedx/core/djangoapps/xblock/utils.py index 359433774f..ef54c016a3 100644 --- a/openedx/core/djangoapps/xblock/utils.py +++ b/openedx/core/djangoapps/xblock/utils.py @@ -46,24 +46,102 @@ def get_secure_token_for_xblock_handler(user_id, block_key_str, time_idx=0): token each user gets for a given XBlock doesn't change, which makes debugging and reasoning about the system simpler.) """ + # Impact of exposure of the SECRET_KEY or XBLOCK_HANDLER_TOKEN_KEYS: + # An actor with the key could produce valid secure tokens for any given user_id and xblock and + # could submit answers to questions on behalf of other users. This would violate the integrity + # of the answer data on our platform. + # Estimated CVSS Base Score: 7.5 + # Score URL: https://www.first.org/cvss/calculator/3.1#CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:H/A:N + # Environment score not given because the importance of the environment may change over time. + + # If this key is actually exposed to malicious actors, we should do a rapid rotation that + # breaks people because in this case a malicious actor can generate valid tokens to submit + # answers as any user. + + # Transitioning from SECRET_KEY to XBLOCK_HANDLER_TOKEN_KEYS: + # + # 1. Add the current secret key and a new xblock handler specific secret key to the + # XBLOCK_HANDLER_TOKEN_KEYS list in your settings file or yaml. The order of the keys + # matters and so the new xblock specific key should be at index 0. + # eg. XBLOCK_HANDLER_TOKEN_KEYS = ["", ""] + # 2. Wait 4 days after the code has been deployed to production. + # 3. Remove the django secret key from the XBLOCK_HANDLER_TOKEN_KEYS list. + # eg. XBLOCK_HANDLER_TOKEN_KEYS = [""] + + # Rotation Process (Assumes you've already transitioned from SECRET_KEY to XBLOCK_HANDLER_TOKEN_KEYS): + # + # If the likelihood that the old key has been exposed to malicious actors is high, you should do a rapid rotation + # that breaks people because in this case a malicious actor can generate valid tokens to submit answers + # impersonating any user. If the likelihood of a malicious actor is high, skip step 2 from this process (Start + # using only the new key right away.) + # 1. Add the newly generated hashing key at index 0 of the XBLOCK_HANDLER_TOKEN_KEYS list in your settings. + # eg. XBLOCK_HANDLER_TOKEN_KEYS = ["", ""] + # 2. Wait 4 days after the code has been deployed to production. + # 3. Remove the old key from the list. + # eg. XBLOCK_HANDLER_TOKEN_KEYS = [""] + + # Fall back to SECRET_KEY if XBLOCK_HANDLER_TOKEN_KEYS is not set or is an empty list. + if getattr(settings, "XBLOCK_HANDLER_TOKEN_KEYS", None): + hashing_key = settings.XBLOCK_HANDLER_TOKEN_KEYS[0] + else: + hashing_key = settings.SECRET_KEY + + return _get_secure_token_for_xblock_handler(user_id, block_key_str, time_idx, hashing_key) + + +def _get_secure_token_for_xblock_handler(user_id, block_key_str, time_idx: int, hashing_key: str): + """ + Internal function to extract repeating hashing steps which we + call multiple times with different time_idx and hashing key. + """ + # This was supposed to be an interval boundary number that matches + # the current 2 day chunk(midnight UTC of every other day). + # Instead it's a number that will always be consistent but + # otherwise does not have a semantic meaning. + # Right now the math is missing a multiplication by TOKEN_PERIOD after + # the math.floor which means the unit is days which is then added to + # 0 or -TOKEN_PERIOD seconds. + # Right now this is only valid for 0-2 days instead of the intended + # 2-4 days because of the units issue above. Correcting the math should + # be possible but we are not going to do it in the middle of the other + # change we're making. We've made https://openedx.atlassian.net/browse/ARCHBOM-1677 + # to come back and deal with it. TOKEN_PERIOD = 24 * 60 * 60 * 2 # These URLs are valid for 2-4 days time_token = math.floor(time.time() / TOKEN_PERIOD) time_token += TOKEN_PERIOD * time_idx - check_string = text_type(time_token) + ':' + text_type(user_id) + ':' + block_key_str - secure_key = hmac.new(settings.SECRET_KEY.encode('utf-8'), check_string.encode('utf-8'), hashlib.sha256).hexdigest() + check_string = str(time_token) + ':' + str(user_id) + ':' + block_key_str + secure_key = hmac.new(hashing_key.encode('utf-8'), check_string.encode('utf-8'), hashlib.sha256).hexdigest() return secure_key[:20] def validate_secure_token_for_xblock_handler(user_id, block_key_str, token): """ Returns True if the specified handler authentication token is valid for the - given XBlock ID and user ID. Otherwise returns false. + given Xblock ID and user ID. Otherwise returns false. See get_secure_token_for_xblock_handler """ - token = token.encode('utf-8') # This line isn't needed after python 3, nor the .encode('utf-8') below - token_expected = get_secure_token_for_xblock_handler(user_id, block_key_str).encode('utf-8') - prev_token_expected = get_secure_token_for_xblock_handler(user_id, block_key_str, -1).encode('utf-8') + if getattr(settings, "XBLOCK_HANDLER_TOKEN_KEYS", None): + hashing_keys = settings.XBLOCK_HANDLER_TOKEN_KEYS + else: + hashing_keys = [settings.SECRET_KEY] + + final_result = False + for key in hashing_keys: + final_result |= _validate_secure_token_for_xblock_handler(user_id, block_key_str, token, key) + + # We check all keys so that the computation takes a constant amount of + # time to produce its answer (security best practice). + return final_result + + +def _validate_secure_token_for_xblock_handler(user_id, block_key_str, token: str, hashing_key): + """ + Internal function to validate the incoming token with tokens generated with the given + hashing key. + """ + token_expected = _get_secure_token_for_xblock_handler(user_id, block_key_str, 0, hashing_key) + prev_token_expected = _get_secure_token_for_xblock_handler(user_id, block_key_str, -1, hashing_key) result1 = hmac.compare_digest(token, token_expected) result2 = hmac.compare_digest(token, prev_token_expected) # All computations happen above this line so this function always takes a