diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index d44a3887d0..e450a6cd23 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -5,6 +5,8 @@ from time import time import json from uuid import uuid4 import math +import psutil +from contextlib import contextmanager from celery.utils.log import get_task_logger from celery.states import SUCCESS, READY_STATES, RETRY @@ -39,10 +41,43 @@ def _get_number_of_subtasks(total_num_items, items_per_task): The number of subtask_id values returned by this should match the number of chunks returned by the generate_items_for_subtask generator. """ - return int(math.ceil(float(total_num_items) / float(items_per_task))) + num_subtasks, remainder = divmod(total_num_items, items_per_task) + if remainder: + num_subtasks += 1 + return num_subtasks -def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_task, total_num_subtasks): +@contextmanager +def track_memory_usage(metric, course_id): + """ + Context manager to track how much memory (in bytes) a given process uses. + Metrics will look like: 'course_email.subtask_generation.memory.rss' + or 'course_email.subtask_generation.memory.vms'. + """ + memory_types = ['rss', 'vms'] + process = psutil.Process() + baseline_memory_info = process.get_memory_info() + baseline_usages = [getattr(baseline_memory_info, memory_type) for memory_type in memory_types] + yield + for memory_type, baseline_usage in zip(memory_types, baseline_usages): + total_memory_info = process.get_memory_info() + total_usage = getattr(total_memory_info, memory_type) + memory_used = total_usage - baseline_usage + dog_stats_api.increment( + metric + "." + memory_type, + memory_used, + tags=["course_id:{}".format(course_id)], + ) + + +def _generate_items_for_subtask( + item_queryset, + item_fields, + total_num_items, + items_per_task, + total_num_subtasks, + course_id, +): """ Generates a chunk of "items" that should be passed into a subtask. @@ -53,6 +88,7 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, ite `total_num_items` : the result of item_queryset.count(). `items_per_query` : size of chunks to break the query operation into. `items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask. + `course_id` : course_id of the course. Only needed for the track_memory_usage context manager. Returns: yields a list of dicts, where each dict contains the fields in `item_fields`, plus the 'pk' field. @@ -64,18 +100,20 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, ite num_subtasks = 0 items_for_task = [] - for item in item_queryset.values(*all_item_fields).iterator(): - if len(items_for_task) == items_per_task and num_subtasks < total_num_subtasks - 1: - yield items_for_task - num_items_queued += items_per_task - items_for_task = [] - num_subtasks += 1 - items_for_task.append(item) - # yield remainder items for task, if any - if items_for_task: - yield items_for_task - num_items_queued += len(items_for_task) + with track_memory_usage('course_email.subtask_generation.memory', course_id): + for item in item_queryset.values(*all_item_fields).iterator(): + if len(items_for_task) == items_per_task and num_subtasks < total_num_subtasks - 1: + yield items_for_task + num_items_queued += items_per_task + items_for_task = [] + num_subtasks += 1 + items_for_task.append(item) + + # yield remainder items for task, if any + if items_for_task: + yield items_for_task + num_items_queued += len(items_for_task) # Note, depending on what kind of DB is used, it's possible for the queryset # we iterate over to change in the course of the query. Therefore it's @@ -269,19 +307,20 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys # Construct a generator that will return the recipients to use for each subtask. # Pass in the desired fields to fetch for each recipient. - item_generator = _generate_items_for_subtask( + item_list_generator = _generate_items_for_subtask( item_queryset, item_fields, total_num_items, items_per_task, total_num_subtasks, + entry.course_id, ) # Now create the subtasks, and start them running. TASK_LOG.info("Task %s: creating %s subtasks to process %s items.", task_id, total_num_subtasks, total_num_items) num_subtasks = 0 - for item_list in item_generator: + for item_list in item_list_generator: subtask_id = subtask_id_list[num_subtasks] num_subtasks += 1 subtask_status = SubtaskStatus.create(subtask_id)