Merge pull request #4784 from edx/adam/track-bulk-email-memory
Adam/track bulk email memory
This commit is contained in:
@@ -5,6 +5,8 @@ from time import time
|
||||
import json
|
||||
from uuid import uuid4
|
||||
import math
|
||||
import psutil
|
||||
from contextlib import contextmanager
|
||||
|
||||
from celery.utils.log import get_task_logger
|
||||
from celery.states import SUCCESS, READY_STATES, RETRY
|
||||
@@ -39,10 +41,43 @@ def _get_number_of_subtasks(total_num_items, items_per_task):
|
||||
The number of subtask_id values returned by this should match the number of chunks returned
|
||||
by the generate_items_for_subtask generator.
|
||||
"""
|
||||
return int(math.ceil(float(total_num_items) / float(items_per_task)))
|
||||
num_subtasks, remainder = divmod(total_num_items, items_per_task)
|
||||
if remainder:
|
||||
num_subtasks += 1
|
||||
return num_subtasks
|
||||
|
||||
|
||||
def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_task, total_num_subtasks):
|
||||
@contextmanager
|
||||
def track_memory_usage(metric, course_id):
|
||||
"""
|
||||
Context manager to track how much memory (in bytes) a given process uses.
|
||||
Metrics will look like: 'course_email.subtask_generation.memory.rss'
|
||||
or 'course_email.subtask_generation.memory.vms'.
|
||||
"""
|
||||
memory_types = ['rss', 'vms']
|
||||
process = psutil.Process()
|
||||
baseline_memory_info = process.get_memory_info()
|
||||
baseline_usages = [getattr(baseline_memory_info, memory_type) for memory_type in memory_types]
|
||||
yield
|
||||
for memory_type, baseline_usage in zip(memory_types, baseline_usages):
|
||||
total_memory_info = process.get_memory_info()
|
||||
total_usage = getattr(total_memory_info, memory_type)
|
||||
memory_used = total_usage - baseline_usage
|
||||
dog_stats_api.increment(
|
||||
metric + "." + memory_type,
|
||||
memory_used,
|
||||
tags=["course_id:{}".format(course_id)],
|
||||
)
|
||||
|
||||
|
||||
def _generate_items_for_subtask(
|
||||
item_queryset,
|
||||
item_fields,
|
||||
total_num_items,
|
||||
items_per_task,
|
||||
total_num_subtasks,
|
||||
course_id,
|
||||
):
|
||||
"""
|
||||
Generates a chunk of "items" that should be passed into a subtask.
|
||||
|
||||
@@ -53,6 +88,7 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, ite
|
||||
`total_num_items` : the result of item_queryset.count().
|
||||
`items_per_query` : size of chunks to break the query operation into.
|
||||
`items_per_task` : maximum size of chunks to break each query chunk into for use by a subtask.
|
||||
`course_id` : course_id of the course. Only needed for the track_memory_usage context manager.
|
||||
|
||||
Returns: yields a list of dicts, where each dict contains the fields in `item_fields`, plus the 'pk' field.
|
||||
|
||||
@@ -64,18 +100,20 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, ite
|
||||
num_subtasks = 0
|
||||
|
||||
items_for_task = []
|
||||
for item in item_queryset.values(*all_item_fields).iterator():
|
||||
if len(items_for_task) == items_per_task and num_subtasks < total_num_subtasks - 1:
|
||||
yield items_for_task
|
||||
num_items_queued += items_per_task
|
||||
items_for_task = []
|
||||
num_subtasks += 1
|
||||
items_for_task.append(item)
|
||||
|
||||
# yield remainder items for task, if any
|
||||
if items_for_task:
|
||||
yield items_for_task
|
||||
num_items_queued += len(items_for_task)
|
||||
with track_memory_usage('course_email.subtask_generation.memory', course_id):
|
||||
for item in item_queryset.values(*all_item_fields).iterator():
|
||||
if len(items_for_task) == items_per_task and num_subtasks < total_num_subtasks - 1:
|
||||
yield items_for_task
|
||||
num_items_queued += items_per_task
|
||||
items_for_task = []
|
||||
num_subtasks += 1
|
||||
items_for_task.append(item)
|
||||
|
||||
# yield remainder items for task, if any
|
||||
if items_for_task:
|
||||
yield items_for_task
|
||||
num_items_queued += len(items_for_task)
|
||||
|
||||
# Note, depending on what kind of DB is used, it's possible for the queryset
|
||||
# we iterate over to change in the course of the query. Therefore it's
|
||||
@@ -269,19 +307,20 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys
|
||||
|
||||
# Construct a generator that will return the recipients to use for each subtask.
|
||||
# Pass in the desired fields to fetch for each recipient.
|
||||
item_generator = _generate_items_for_subtask(
|
||||
item_list_generator = _generate_items_for_subtask(
|
||||
item_queryset,
|
||||
item_fields,
|
||||
total_num_items,
|
||||
items_per_task,
|
||||
total_num_subtasks,
|
||||
entry.course_id,
|
||||
)
|
||||
|
||||
# Now create the subtasks, and start them running.
|
||||
TASK_LOG.info("Task %s: creating %s subtasks to process %s items.",
|
||||
task_id, total_num_subtasks, total_num_items)
|
||||
num_subtasks = 0
|
||||
for item_list in item_generator:
|
||||
for item_list in item_list_generator:
|
||||
subtask_id = subtask_id_list[num_subtasks]
|
||||
num_subtasks += 1
|
||||
subtask_status = SubtaskStatus.create(subtask_id)
|
||||
|
||||
Reference in New Issue
Block a user