From 95a112a3dd0fd51593080e623e3ff95d7ebd441c Mon Sep 17 00:00:00 2001 From: Usman Khalid <2200617@gmail.com> Date: Fri, 31 Jan 2014 23:17:42 +0500 Subject: [PATCH] instructor_task: Distribute extra items among subtasks of last query. When creating an instructor task total_num_items may change between the time it and the number of subtasks is calculated and the time the subtasks are actually queued (all of this cannot happen in one transaction). In such a case the extra items are distributed among the subtasks of the last query. LMS-2090 --- lms/djangoapps/instructor_task/subtasks.py | 42 ++++---- .../instructor_task/tests/test_base.py | 2 +- .../instructor_task/tests/test_subtasks.py | 95 +++++++++++++++++++ 3 files changed, 122 insertions(+), 17 deletions(-) create mode 100644 lms/djangoapps/instructor_task/tests/test_subtasks.py diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index e4d94cd671..93b4fd27a8 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -51,7 +51,7 @@ def _get_number_of_subtasks(total_num_items, items_per_query, items_per_task): return total_num_tasks -def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_query, items_per_task): +def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, total_num_subtasks, items_per_query, items_per_task): """ Generates a chunk of "items" that should be passed into a subtask. @@ -68,15 +68,29 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, ite Warning: if the algorithm here changes, the _get_number_of_subtasks() method should similarly be changed. """ num_queries = int(math.ceil(float(total_num_items) / float(items_per_query))) - last_pk = item_queryset[0].pk - 1 + last_pk = item_queryset.order_by('pk')[0].pk - 1 num_items_queued = 0 + available_num_subtasks = total_num_subtasks all_item_fields = list(item_fields) all_item_fields.append('pk') - for _ in range(num_queries): - item_sublist = list(item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields)[:items_per_query]) + + for query_number in range(num_queries): + # In case total_num_items has increased since it was initially calculated + # include all remaining items in last query. + item_sublist = item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields) + if query_number < num_queries - 1: + item_sublist = list(item_sublist[:items_per_query]) + else: + item_sublist = list(item_sublist) + last_pk = item_sublist[-1]['pk'] num_items_this_query = len(item_sublist) - num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task))) + + # In case total_num_items has increased since it was initially calculated just distribute the extra + # items among the available subtasks. + num_tasks_this_query = min(available_num_subtasks, int(math.ceil(float(num_items_this_query) / float(items_per_task)))) + available_num_subtasks -= num_tasks_this_query + chunk = int(math.ceil(float(num_items_this_query) / float(num_tasks_this_query))) for i in range(num_tasks_this_query): items_for_task = item_sublist[i * chunk:i * chunk + chunk] @@ -84,11 +98,12 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, ite num_items_queued += num_items_this_query - # Sanity check: we expect the chunking to be properly summing to the original count: + # Because queueing does not happen in one transaction the number of items in the queryset may change + # from the initial count. For example if the queryset is of the CourseEnrollment model students may + # enroll or unenroll while queueing is in progress. The purpose of the original count is to estimate the + # number of subtasks needed to perform the requested task. if num_items_queued != total_num_items: - error_msg = "Number of items generated by chunking {} not equal to original total {}".format(num_items_queued, total_num_items) - TASK_LOG.error(error_msg) - raise ValueError(error_msg) + TASK_LOG.info("Number of items generated by chunking %s not equal to original total %s", num_items_queued, total_num_items) class SubtaskStatus(object): @@ -278,6 +293,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys item_queryset, item_fields, total_num_items, + total_num_subtasks, items_per_query, items_per_task ) @@ -293,13 +309,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys new_subtask = create_subtask_fcn(item_list, subtask_status) new_subtask.apply_async() - # Sanity check: we expect the subtask to be properly summing to the original count: - if num_subtasks != len(subtask_id_list): - task_id = entry.task_id - error_fmt = "Task {}: number of tasks generated {} not equal to original total {}" - error_msg = error_fmt.format(task_id, num_subtasks, len(subtask_id_list)) - TASK_LOG.error(error_msg) - raise ValueError(error_msg) + # Subtasks have been queued so no exceptions should be raised after this point. # Return the task progress as stored in the InstructorTask object. return progress diff --git a/lms/djangoapps/instructor_task/tests/test_base.py b/lms/djangoapps/instructor_task/tests/test_base.py index e0abfdf51f..6ee8d3dc3d 100644 --- a/lms/djangoapps/instructor_task/tests/test_base.py +++ b/lms/djangoapps/instructor_task/tests/test_base.py @@ -28,7 +28,7 @@ from instructor_task.views import instructor_task_status TEST_COURSE_ORG = 'edx' -TEST_COURSE_NAME = 'Test Course' +TEST_COURSE_NAME = 'test course' TEST_COURSE_NUMBER = '1.23x' TEST_SECTION_NAME = "Problem" TEST_COURSE_ID = 'edx/1.23x/test_course' diff --git a/lms/djangoapps/instructor_task/tests/test_subtasks.py b/lms/djangoapps/instructor_task/tests/test_subtasks.py new file mode 100644 index 0000000000..56660ec660 --- /dev/null +++ b/lms/djangoapps/instructor_task/tests/test_subtasks.py @@ -0,0 +1,95 @@ +""" +Unit tests for instructor_task subtasks. +""" +from uuid import uuid4 + +from mock import Mock, patch + +from student.models import CourseEnrollment + +from instructor_task.subtasks import queue_subtasks_for_query +from instructor_task.tests.factories import InstructorTaskFactory +from instructor_task.tests.test_base import InstructorTaskCourseTestCase + + +class TestSubtasks(InstructorTaskCourseTestCase): + """Tests for subtasks.""" + + def setUp(self): + super(TestSubtasks, self).setUp() + self.initialize_course() + + def _enroll_students_in_course(self, course_id, num_students): + """Create and enroll some students in the course.""" + + for _ in range(num_students): + random_id = uuid4().hex[:8] + self.create_student(username='student{0}'.format(random_id)) + + def _queue_subtasks(self, create_subtask_fcn, items_per_query, items_per_task, initial_count, extra_count): + """Queue subtasks while enrolling more students into course in the middle of the process.""" + + task_id = str(uuid4()) + instructor_task = InstructorTaskFactory.create( + course_id=self.course.id, + task_id=task_id, + task_key='dummy_task_key', + task_type='bulk_course_email', + ) + + self._enroll_students_in_course(self.course.id, initial_count) + task_queryset = CourseEnrollment.objects.filter(course_id=self.course.id) + + def initialize_subtask_info(*args): # pylint: disable=unused-argument + """Instead of initializing subtask info enroll some more students into course.""" + self._enroll_students_in_course(self.course.id, extra_count) + return {} + + with patch('instructor_task.subtasks.initialize_subtask_info') as mock_initialize_subtask_info: + mock_initialize_subtask_info.side_effect = initialize_subtask_info + queue_subtasks_for_query( + entry=instructor_task, + action_name='action_name', + create_subtask_fcn=create_subtask_fcn, + item_queryset=task_queryset, + item_fields=[], + items_per_query=items_per_query, + items_per_task=items_per_task, + ) + + def test_queue_subtasks_for_query1(self): + """Test queue_subtasks_for_query() if in last query the subtasks only need to accommodate < items_per_tasks items.""" + + mock_create_subtask_fcn = Mock() + self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 8, 1) + + # Check number of items for each subtask + mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list + self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3) + self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3) + self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 3) + + def test_queue_subtasks_for_query2(self): + """Test queue_subtasks_for_query() if in last query the subtasks need to accommodate > items_per_task items.""" + + mock_create_subtask_fcn = Mock() + self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 8, 3) + + # Check number of items for each subtask + mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list + self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3) + self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3) + self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 5) + + def test_queue_subtasks_for_query3(self): + """Test queue_subtasks_for_query() if in last query the number of items available > items_per_query.""" + + mock_create_subtask_fcn = Mock() + self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 11, 3) + + # Check number of items for each subtask + mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list + self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3) + self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3) + self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 4) + self.assertEqual(len(mock_create_subtask_fcn_args[3][0][0]), 4)