diff --git a/openedx/core/djangoapps/password_policy/hibp.py b/openedx/core/djangoapps/password_policy/hibp.py index 901b69ddd6..5be829d308 100644 --- a/openedx/core/djangoapps/password_policy/hibp.py +++ b/openedx/core/djangoapps/password_policy/hibp.py @@ -2,7 +2,7 @@ Wrapper to use pwnedpassword Service """ - +import hashlib import logging import requests @@ -13,6 +13,9 @@ from openedx.core.djangoapps.user_authn.config.waffle import ENABLE_PWNED_PASSWO log = logging.getLogger(__name__) +SHA_LENGTH = 40 +HEX_BASE = 16 + def convert_password_tuple(value): """ @@ -47,6 +50,10 @@ class PwnedPasswordsAPI: "7ecd77ecd77ecd7": 12, } """ + is_encrypted = PwnedPasswordsAPI.is_sha1(password) + if not is_encrypted: + password = hashlib.sha1(password.encode('utf-8')).hexdigest().upper() + range_url = PwnedPasswordsAPI.API_URL + '/range/{}'.format(password[:5]) if ENABLE_PWNED_PASSWORD_API.is_enabled(): @@ -61,3 +68,18 @@ class PwnedPasswordsAPI: except Exception as exc: # pylint: disable=W0703 log.exception(f"Unable to range the password: {exc}") + + @staticmethod + def is_sha1(maybe_sha): + """ + Validates whether the provided string is sha1 encrypted or not + """ + if len(maybe_sha) != SHA_LENGTH: + return False + + try: + sha_int = int(maybe_sha, HEX_BASE) + except ValueError: + return False + + return True diff --git a/openedx/core/djangoapps/password_policy/tests/test_hibp.py b/openedx/core/djangoapps/password_policy/tests/test_hibp.py index 6f0040b33c..7913b4b54d 100644 --- a/openedx/core/djangoapps/password_policy/tests/test_hibp.py +++ b/openedx/core/djangoapps/password_policy/tests/test_hibp.py @@ -42,13 +42,22 @@ class PwnedPasswordsAPITest(TestCase): """ Test that captures the warning log on timeout """ + password = 'testpassword' + password_hash_hex = '8BB6118F8FD6935AD0876A3BE34A717D32708FFD' with LogCapture(log.name) as log_capture: - PwnedPasswordsAPI.range('7ecd7') + PwnedPasswordsAPI.range(password) log_capture.check_present( ( log.name, 'WARNING', - 'Request timed out for 7ecd7' + 'Request timed out for {}'.format(password_hash_hex) ) ) - assert 'Request timed out for 7ecd7' in log_capture.records[0].getMessage() + assert 'Request timed out for {}'.format(password_hash_hex) in log_capture.records[0].getMessage() + + def test_provided_string_is_sha1_or_not(self): + hashed_password = '8BB6118F8FD6935AD0876A3BE34A717D32708FFD' + self.assertTrue(PwnedPasswordsAPI.is_sha1(hashed_password)) + + raw_password = 'testpassword' + self.assertFalse(PwnedPasswordsAPI.is_sha1(raw_password))