From 8c8a567dd0284bf820b0630b95c66399637dcef9 Mon Sep 17 00:00:00 2001 From: Eemaan Amir <57627710+eemaanamir@users.noreply.github.com> Date: Thu, 15 May 2025 17:02:37 +0500 Subject: [PATCH] feat: added batching to managemnet command to avoid queueing errors (#36720) * feat: added batching to managemnet command to avoid queueing errors * fix: refactored code to fix pylint errors * fix: fixed quality check errors * fix: fixed quality check errors --- .../commands/backfill_is_disabled.py | 112 +++++++++++++++--- 1 file changed, 95 insertions(+), 17 deletions(-) diff --git a/common/djangoapps/student/management/commands/backfill_is_disabled.py b/common/djangoapps/student/management/commands/backfill_is_disabled.py index e0ca41a8c0..b941bbc5cb 100644 --- a/common/djangoapps/student/management/commands/backfill_is_disabled.py +++ b/common/djangoapps/student/management/commands/backfill_is_disabled.py @@ -8,11 +8,13 @@ batches to minimize memory usage and supports a dry-run mode for testing. """ import logging +import time from django.core.management.base import BaseCommand from django.contrib.auth import get_user_model from django.contrib.auth.hashers import UNUSABLE_PASSWORD_PREFIX from django.db import DatabaseError from common.djangoapps.track import segment +import requests LOGGER = logging.getLogger(__name__) User = get_user_model() @@ -28,7 +30,7 @@ class Command(BaseCommand): parser.add_argument( '--batch-size', type=int, - default=100, + default=9000, help='Number of users to process per batch' ) parser.add_argument( @@ -36,19 +38,81 @@ class Command(BaseCommand): action='store_true', help='Simulate the back fill without calling Segment' ) + parser.add_argument( + '--retry-limit', + type=int, default=3, + help='Retry attempts for failed API calls' + ) + + def _process_user(self, user_id, batch_number, dry_run): + """Process a single user, logging success or failure.""" + if dry_run: + LOGGER.info( + f"[Dry Run] Would update user {user_id} with is_disabled=true " + f"in batch {batch_number}" + ) + return True + try: + segment.analytics.identify(user_id=user_id, traits={'is_disabled': True}) + LOGGER.info( + f"Successfully updated user {user_id} with is_disabled=true " + f"in batch {batch_number}" + ) + return True + except (ConnectionError, ValueError) as e: + LOGGER.error( + f"Failed to update user {user_id} in batch {batch_number}: " + f"{str(e)}" + ) + return False + + def _process_batch(self, users_batch, batch_number, dry_run, retry_limit): + """Process a batch of users with retries.""" + current_batch_size = len(users_batch) + if dry_run: + for user_id in users_batch: + self._process_user(user_id, batch_number, dry_run) + return current_batch_size + + retry_count = 0 + success = False + while not success and retry_count <= retry_limit: + try: + for user_id in users_batch: + self._process_user(user_id, batch_number, dry_run) + segment.analytics.flush() + LOGGER.info(f"Successfully processed batch {batch_number}") + success = True + return current_batch_size + except (requests.exceptions.RequestException, segment.analytics.errors.APIError) as e: + retry_count += 1 + if retry_count <= retry_limit: + LOGGER.warning( + f"Batch {batch_number} failed (attempt {retry_count}/" + f"{retry_limit}): {str(e)}" + ) + time.sleep(2 * retry_count) + else: + LOGGER.error( + f"Batch {batch_number} failed after {retry_limit} attempts, " + f"processed {{processed}} users: {str(e)}" + ) + return None def handle(self, *args, **options): batch_size = options['batch_size'] dry_run = options['dry_run'] + retry_limit = options['retry_limit'] try: - LOGGER.info(f"Starting back fill with batch_size={batch_size}, dry_run={dry_run}") + LOGGER.info( + f"Starting backfill (batch_size={batch_size}, dry_run={dry_run}, " + f"retry_limit={retry_limit})" + ) - queryset = User.objects.filter( + total_users = User.objects.filter( password__startswith=UNUSABLE_PASSWORD_PREFIX - ).values('id', 'password') - - total_users = queryset.count() + ).count() if total_users == 0: LOGGER.info("No users to process, exiting") @@ -56,19 +120,33 @@ class Command(BaseCommand): LOGGER.info(f"Found {total_users} users that are disabled") + offset = 0 processed = 0 - for user in queryset.iterator(chunk_size=batch_size): - try: - if dry_run: - LOGGER.info(f"[Dry Run] Would update user {user['id']} with is_disabled=true") - else: - segment.identify(user['id'], {'is_disabled': 'true'}) - LOGGER.info(f"Successfully updated user {user['id']} with is_disabled=true") - processed += 1 - except (ConnectionError, ValueError) as e: - LOGGER.error(f"Failed to update user {user['id']}: {str(e)}") + batch_number = 0 - LOGGER.info(f"Back fill completed: processed {processed}/{total_users} users") + while offset < total_users: + batch_number += 1 + users_batch = User.objects.filter( + password__startswith=UNUSABLE_PASSWORD_PREFIX + ).values_list('id', flat=True)[offset:offset + batch_size] + LOGGER.info(f"Processing batch {batch_number} ({len(users_batch)} users)") + + batch_result = self._process_batch( + users_batch, batch_number, dry_run, retry_limit + ) + if batch_result is None: + LOGGER.error( + f"Backfill stopped, processed {processed} users" + ) + return + processed += batch_result + offset += batch_size + LOGGER.info(f"Processed {processed} / {total_users} users") + + LOGGER.info( + f"Completed: processed {processed} / {total_users} users in " + f"{batch_number} batches" + ) except DatabaseError as e: LOGGER.error(f"Back fill failed: {str(e)}")