fix: fir segment event (#34279)

fire segment event for PWNED_PASSWORD on registration page password validation

VAN-1830
This commit is contained in:
Mubbshar Anwar
2024-03-01 10:22:57 +05:00
committed by GitHub
parent ea55eca47e
commit b6d89bcd59
6 changed files with 91 additions and 9 deletions

View File

@@ -12,6 +12,7 @@ from django.core.exceptions import ObjectDoesNotExist
from django.core.validators import ValidationError, validate_email
from django.utils.translation import override as override_language
from django.utils.translation import gettext as _
from eventtracking import tracker
from pytz import UTC
from common.djangoapps.student import views as student_views
from common.djangoapps.student.models import (
@@ -679,11 +680,14 @@ def _validate_password(password, username=None, email=None, reset_password_page=
(settings.ENABLE_AUTHN_RESET_PASSWORD_HIBP_POLICY and reset_password_page) or
(settings.ENABLE_AUTHN_REGISTER_HIBP_POLICY and not reset_password_page)
):
pwned_response = check_pwned_password(password)
if pwned_response.get('vulnerability', 'no') == 'yes':
pwned_properties = check_pwned_password(password)
if pwned_properties.get('vulnerability', 'no') == 'yes':
if reset_password_page is False:
pwned_properties['user_request_page'] = 'registration'
tracker.emit('edx.bi.user.pwned.password.status', pwned_properties)
if (
reset_password_page or
pwned_response.get('frequency', 0) >= settings.HIBP_REGISTRATION_PASSWORD_FREQUENCY_THRESHOLD
pwned_properties.get('frequency', 0) >= settings.HIBP_REGISTRATION_PASSWORD_FREQUENCY_THRESHOLD
):
raise errors.AccountPasswordInvalid(accounts.AUTHN_PASSWORD_COMPROMISED_MSG)

View File

@@ -24,7 +24,12 @@ log = logging.getLogger('edx.celery.task')
@shared_task
@set_code_owner_attribute
def check_pwned_password_and_send_track_event(user_id, password, internal_user=False, is_new_user=False):
def check_pwned_password_and_send_track_event(
user_id, password,
internal_user=False,
is_new_user=False,
request_page=''
):
"""
Check the Pwned Databases and send its event to Segment.
"""
@@ -33,6 +38,7 @@ def check_pwned_password_and_send_track_event(user_id, password, internal_user=F
if pwned_properties:
pwned_properties['internal_user'] = internal_user
pwned_properties['new_user'] = is_new_user
pwned_properties['user_request_page'] = request_page
segment.track(user_id, 'edx.bi.user.pwned.password.status', pwned_properties)
return pwned_properties
except Exception: # pylint: disable=W0703

View File

@@ -591,7 +591,10 @@ def login_user(request, api_version='v1'): # pylint: disable=too-many-statement
_handle_failed_authentication(user, possibly_authenticated_user)
pwned_properties = check_pwned_password_and_send_track_event(
user.id, request.POST.get('password'), user.is_staff
user_id=user.id,
password=request.POST.get('password'),
internal_user=user.is_staff,
request_page='login'
) if not is_user_third_party_authenticated else {}
# Set default for third party login
password_frequency = pwned_properties.get('frequency', -1)

View File

@@ -288,7 +288,13 @@ def create_account_with_params(request, params): # pylint: disable=too-many-sta
def is_new_user(password, user):
if user is not None:
AUDIT_LOG.info(f"Login success on new account creation - {user.username}")
check_pwned_password_and_send_track_event.delay(user.id, password, user.is_staff, True)
check_pwned_password_and_send_track_event.delay(
user_id=user.id,
password=password,
internal_user=user.is_staff,
is_new_user=True,
request_page='registration'
)
def _link_user_to_third_party_provider(

View File

@@ -4,6 +4,7 @@ Objects and utilities used to construct registration forms.
import copy
from importlib import import_module
from eventtracking import tracker
import re
from django import forms
@@ -241,12 +242,14 @@ class AccountCreationForm(forms.Form):
if settings.ENABLE_AUTHN_REGISTER_HIBP_POLICY:
# Checks the Pwned Databases for password vulnerability.
pwned_response = check_pwned_password(password)
pwned_properties = check_pwned_password(password)
if (
pwned_response.get('vulnerability', 'no') == 'yes' and
pwned_response.get('frequency', 0) >= settings.HIBP_REGISTRATION_PASSWORD_FREQUENCY_THRESHOLD
pwned_properties.get('vulnerability', 'no') == 'yes' and
pwned_properties.get('frequency', 0) >= settings.HIBP_REGISTRATION_PASSWORD_FREQUENCY_THRESHOLD
):
pwned_properties['user_request_page'] = 'registration'
tracker.emit('edx.bi.user.pwned.password.status', pwned_properties)
raise ValidationError(accounts.AUTHN_PASSWORD_COMPROMISED_MSG)
return password

View File

@@ -25,6 +25,7 @@ from openedx.core.djangoapps.site_configuration.tests.test_util import with_site
from openedx.core.djangoapps.user_api.accounts import (
AUTHN_EMAIL_CONFLICT_MSG,
AUTHN_EMAIL_INVALID_MSG,
AUTHN_PASSWORD_COMPROMISED_MSG,
AUTHN_USERNAME_CONFLICT_MSG,
EMAIL_BAD_LENGTH_MSG,
EMAIL_MAX_LENGTH,
@@ -2320,6 +2321,40 @@ class RegistrationViewTestV2(RegistrationViewTestV1):
assert response.status_code == 200
@override_settings(
ENABLE_AUTHN_REGISTER_HIBP_POLICY=True
)
@mock.patch('eventtracking.tracker.emit')
@mock.patch(
'openedx.core.djangoapps.user_authn.views.registration_form.check_pwned_password',
mock.Mock(return_value={
'vulnerability': 'yes',
'frequency': 3,
'user_request_page': 'registration',
})
)
def test_register_error_with_pwned_password(self, emit):
post_params = {
"email": self.EMAIL,
"name": self.NAME,
"username": self.USERNAME,
"password": self.PASSWORD,
"honor_code": "true",
}
response = self.client.post(
self.url,
post_params,
HTTP_ACCEPT='*/*',
)
emit.assert_called_with(
'edx.bi.user.pwned.password.status',
{
'frequency': 3,
'vulnerability': 'yes',
'user_request_page': 'registration',
})
assert response.status_code == 400
@httpretty.activate
@ddt.ddt
@@ -2812,3 +2847,28 @@ class RegistrationValidationViewTests(test_utils.ApiTestCase, OpenEdxEventsTestM
{'username': 'user', 'email': 'user@email.com', 'is_authn_mfe': True, 'form_field_key': 'email'},
{'email': AUTHN_EMAIL_CONFLICT_MSG}
)
@override_settings(
ENABLE_AUTHN_REGISTER_HIBP_POLICY=True
)
@mock.patch('eventtracking.tracker.emit')
@mock.patch(
'openedx.core.djangoapps.user_api.accounts.api.check_pwned_password',
mock.Mock(return_value={
'vulnerability': 'yes',
'frequency': 3,
'user_request_page': 'registration',
})
)
def test_pwned_password_and_emit_track_event(self, emit):
self.assertValidationDecision(
{'password': 'testtest12'},
{'password': AUTHN_PASSWORD_COMPROMISED_MSG}
)
emit.assert_called_with(
'edx.bi.user.pwned.password.status',
{
'frequency': 3,
'vulnerability': 'yes',
'user_request_page': 'registration',
})