diff --git a/openedx/core/djangoapps/user_authn/views/login.py b/openedx/core/djangoapps/user_authn/views/login.py index c9d8faed92..4bcc4226f3 100644 --- a/openedx/core/djangoapps/user_authn/views/login.py +++ b/openedx/core/djangoapps/user_authn/views/login.py @@ -483,7 +483,7 @@ def enterprise_selection_page(request, user, next_url): @ensure_csrf_cookie @require_http_methods(['POST']) @ratelimit( - key='openedx.core.djangoapps.util.ratelimit.request_post_email', + key='openedx.core.djangoapps.util.ratelimit.request_post_email_or_username', rate=settings.LOGISTRATION_PER_EMAIL_RATELIMIT_RATE, method='POST', ) # lint-amnesty, pylint: disable=too-many-statements @@ -526,7 +526,7 @@ def login_user(request, api_version='v1'): # pylint: disable=too-many-statement _parse_analytics_param_for_course_id(request) third_party_auth_requested = third_party_auth.is_enabled() and pipeline.running(request) - first_party_auth_requested = bool(request.POST.get('email')) or bool(request.POST.get('password')) + first_party_auth_requested = any(bool(request.POST.get(p)) for p in ['email', 'email_or_username', 'password']) is_user_third_party_authenticated = False set_custom_attribute('login_user_course_id', request.POST.get('course_id')) diff --git a/openedx/core/djangoapps/user_authn/views/tests/test_login.py b/openedx/core/djangoapps/user_authn/views/tests/test_login.py index e85611e996..a205362b3b 100644 --- a/openedx/core/djangoapps/user_authn/views/tests/test_login.py +++ b/openedx/core/djangoapps/user_authn/views/tests/test_login.py @@ -573,6 +573,24 @@ class LoginTest(SiteMixin, CacheIsolationTestCase, OpenEdxEventsTestMixin): response, _audit_log = self._login_response(self.user_email, 'wrong_password') self._assert_response(response, success=False, value='Too many failed login attempts') + @patch('openedx.core.djangoapps.util.ratelimit.real_ip') + def test_excessive_login_attempts_by_username(self, real_ip_mock): + # try logging in 6 times, the defalutlimit for the number of failed + # login attempts in one 5 minute period before the rate gets limited + # for a specific username. + + # We freeze time to deal with the fact that rate limit time boundaries + # are not predictable and we don't want the test to be flaky. + with freeze_time(): + for i in range(6): + password = f'test_password{i}' + # Provide unique IPs so we don't get ip rate limited. + real_ip_mock.return_value = f'192.168.1.{i}' + self._login_response(self.username, password) + # check to see if this response indicates that this was ratelimited + response, _audit_log = self._login_response(self.username, 'wrong_password') + self._assert_response(response, success=False, value='Too many failed login attempts') + def test_excessive_login_attempts_by_ip(self): # try logging in 5 times, the default limit for the number of failed # login attempts in one 5 minute period before the rate gets limited diff --git a/openedx/core/djangoapps/util/ratelimit.py b/openedx/core/djangoapps/util/ratelimit.py index 8c1ee0f88d..a4ffec621c 100644 --- a/openedx/core/djangoapps/util/ratelimit.py +++ b/openedx/core/djangoapps/util/ratelimit.py @@ -52,3 +52,23 @@ def request_data_email(group, request) -> str: # pylint: disable=unused-argumen email = str(uuid4()) return email + + +def request_post_email_or_username(group, request) -> str: # pylint: disable=unused-argument + """ + Return the the email or email_or_username post param if it exists, otherwise return a + random id. + + If the request doesn't have an email or email_or_username post body param, treat it as + a unique key. This will probably mean that it will not get rate limited. + + This ratelimit key function is meant to be used with the user_authn/views/login.py::login_user + function. To rate-limit any first party auth. For 3rd party auth, there is separate rate limiting + currently in place so we don't do any rate limiting for that case here. + """ + + email_or_username = request.POST.get('email_or_username') or request.POST.get('email') + if not email_or_username: + email_or_username = str(uuid4()) + + return email_or_username diff --git a/openedx/core/djangoapps/util/tests/test_ratelimit.py b/openedx/core/djangoapps/util/tests/test_ratelimit.py index 35be364ba1..0b60c9b068 100644 --- a/openedx/core/djangoapps/util/tests/test_ratelimit.py +++ b/openedx/core/djangoapps/util/tests/test_ratelimit.py @@ -46,3 +46,32 @@ class TestRateLimiting(TestCase): """ XForwardedForMiddleware().process_request(self.request) assert ratelimit.real_ip(None, self.request) == '7.8.9.0' + + def test_request_post_email(self): + """ + Tests post email param. + """ + expected_email = 'test@example.com' + self.request.POST = {'email': expected_email} + assert ratelimit.request_post_email(None, self.request) == expected_email + + def test_request_data_email(self): + """ + Tests data email param. + """ + expected_email = 'test@example.com' + self.request.data = {'email': expected_email} + assert ratelimit.request_data_email(None, self.request) == expected_email + + @ddt.data( + ('email', 'test@example.com'), + ('email_or_username', 'testUsername8967'), + ('email_or_username', 'testUsername@example.com') + ) + @ddt.unpack + def test_request_post_email_or_username(self, param_name, expected_username_or_email): + """ + Tests post email_or_username param. + """ + self.request.POST = {param_name: expected_username_or_email} + assert ratelimit.request_post_email_or_username(None, self.request) == expected_username_or_email