diff --git a/lms/djangoapps/instructor_task/subtasks.py b/lms/djangoapps/instructor_task/subtasks.py index e4d94cd671..93b4fd27a8 100644 --- a/lms/djangoapps/instructor_task/subtasks.py +++ b/lms/djangoapps/instructor_task/subtasks.py @@ -51,7 +51,7 @@ def _get_number_of_subtasks(total_num_items, items_per_query, items_per_task): return total_num_tasks -def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, items_per_query, items_per_task): +def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, total_num_subtasks, items_per_query, items_per_task): """ Generates a chunk of "items" that should be passed into a subtask. @@ -68,15 +68,29 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, ite Warning: if the algorithm here changes, the _get_number_of_subtasks() method should similarly be changed. """ num_queries = int(math.ceil(float(total_num_items) / float(items_per_query))) - last_pk = item_queryset[0].pk - 1 + last_pk = item_queryset.order_by('pk')[0].pk - 1 num_items_queued = 0 + available_num_subtasks = total_num_subtasks all_item_fields = list(item_fields) all_item_fields.append('pk') - for _ in range(num_queries): - item_sublist = list(item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields)[:items_per_query]) + + for query_number in range(num_queries): + # In case total_num_items has increased since it was initially calculated + # include all remaining items in last query. + item_sublist = item_queryset.order_by('pk').filter(pk__gt=last_pk).values(*all_item_fields) + if query_number < num_queries - 1: + item_sublist = list(item_sublist[:items_per_query]) + else: + item_sublist = list(item_sublist) + last_pk = item_sublist[-1]['pk'] num_items_this_query = len(item_sublist) - num_tasks_this_query = int(math.ceil(float(num_items_this_query) / float(items_per_task))) + + # In case total_num_items has increased since it was initially calculated just distribute the extra + # items among the available subtasks. + num_tasks_this_query = min(available_num_subtasks, int(math.ceil(float(num_items_this_query) / float(items_per_task)))) + available_num_subtasks -= num_tasks_this_query + chunk = int(math.ceil(float(num_items_this_query) / float(num_tasks_this_query))) for i in range(num_tasks_this_query): items_for_task = item_sublist[i * chunk:i * chunk + chunk] @@ -84,11 +98,12 @@ def _generate_items_for_subtask(item_queryset, item_fields, total_num_items, ite num_items_queued += num_items_this_query - # Sanity check: we expect the chunking to be properly summing to the original count: + # Because queueing does not happen in one transaction the number of items in the queryset may change + # from the initial count. For example if the queryset is of the CourseEnrollment model students may + # enroll or unenroll while queueing is in progress. The purpose of the original count is to estimate the + # number of subtasks needed to perform the requested task. if num_items_queued != total_num_items: - error_msg = "Number of items generated by chunking {} not equal to original total {}".format(num_items_queued, total_num_items) - TASK_LOG.error(error_msg) - raise ValueError(error_msg) + TASK_LOG.info("Number of items generated by chunking %s not equal to original total %s", num_items_queued, total_num_items) class SubtaskStatus(object): @@ -278,6 +293,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys item_queryset, item_fields, total_num_items, + total_num_subtasks, items_per_query, items_per_task ) @@ -293,13 +309,7 @@ def queue_subtasks_for_query(entry, action_name, create_subtask_fcn, item_querys new_subtask = create_subtask_fcn(item_list, subtask_status) new_subtask.apply_async() - # Sanity check: we expect the subtask to be properly summing to the original count: - if num_subtasks != len(subtask_id_list): - task_id = entry.task_id - error_fmt = "Task {}: number of tasks generated {} not equal to original total {}" - error_msg = error_fmt.format(task_id, num_subtasks, len(subtask_id_list)) - TASK_LOG.error(error_msg) - raise ValueError(error_msg) + # Subtasks have been queued so no exceptions should be raised after this point. # Return the task progress as stored in the InstructorTask object. return progress diff --git a/lms/djangoapps/instructor_task/tests/test_base.py b/lms/djangoapps/instructor_task/tests/test_base.py index e0abfdf51f..6ee8d3dc3d 100644 --- a/lms/djangoapps/instructor_task/tests/test_base.py +++ b/lms/djangoapps/instructor_task/tests/test_base.py @@ -28,7 +28,7 @@ from instructor_task.views import instructor_task_status TEST_COURSE_ORG = 'edx' -TEST_COURSE_NAME = 'Test Course' +TEST_COURSE_NAME = 'test course' TEST_COURSE_NUMBER = '1.23x' TEST_SECTION_NAME = "Problem" TEST_COURSE_ID = 'edx/1.23x/test_course' diff --git a/lms/djangoapps/instructor_task/tests/test_subtasks.py b/lms/djangoapps/instructor_task/tests/test_subtasks.py new file mode 100644 index 0000000000..56660ec660 --- /dev/null +++ b/lms/djangoapps/instructor_task/tests/test_subtasks.py @@ -0,0 +1,95 @@ +""" +Unit tests for instructor_task subtasks. +""" +from uuid import uuid4 + +from mock import Mock, patch + +from student.models import CourseEnrollment + +from instructor_task.subtasks import queue_subtasks_for_query +from instructor_task.tests.factories import InstructorTaskFactory +from instructor_task.tests.test_base import InstructorTaskCourseTestCase + + +class TestSubtasks(InstructorTaskCourseTestCase): + """Tests for subtasks.""" + + def setUp(self): + super(TestSubtasks, self).setUp() + self.initialize_course() + + def _enroll_students_in_course(self, course_id, num_students): + """Create and enroll some students in the course.""" + + for _ in range(num_students): + random_id = uuid4().hex[:8] + self.create_student(username='student{0}'.format(random_id)) + + def _queue_subtasks(self, create_subtask_fcn, items_per_query, items_per_task, initial_count, extra_count): + """Queue subtasks while enrolling more students into course in the middle of the process.""" + + task_id = str(uuid4()) + instructor_task = InstructorTaskFactory.create( + course_id=self.course.id, + task_id=task_id, + task_key='dummy_task_key', + task_type='bulk_course_email', + ) + + self._enroll_students_in_course(self.course.id, initial_count) + task_queryset = CourseEnrollment.objects.filter(course_id=self.course.id) + + def initialize_subtask_info(*args): # pylint: disable=unused-argument + """Instead of initializing subtask info enroll some more students into course.""" + self._enroll_students_in_course(self.course.id, extra_count) + return {} + + with patch('instructor_task.subtasks.initialize_subtask_info') as mock_initialize_subtask_info: + mock_initialize_subtask_info.side_effect = initialize_subtask_info + queue_subtasks_for_query( + entry=instructor_task, + action_name='action_name', + create_subtask_fcn=create_subtask_fcn, + item_queryset=task_queryset, + item_fields=[], + items_per_query=items_per_query, + items_per_task=items_per_task, + ) + + def test_queue_subtasks_for_query1(self): + """Test queue_subtasks_for_query() if in last query the subtasks only need to accommodate < items_per_tasks items.""" + + mock_create_subtask_fcn = Mock() + self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 8, 1) + + # Check number of items for each subtask + mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list + self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3) + self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3) + self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 3) + + def test_queue_subtasks_for_query2(self): + """Test queue_subtasks_for_query() if in last query the subtasks need to accommodate > items_per_task items.""" + + mock_create_subtask_fcn = Mock() + self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 8, 3) + + # Check number of items for each subtask + mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list + self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3) + self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3) + self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 5) + + def test_queue_subtasks_for_query3(self): + """Test queue_subtasks_for_query() if in last query the number of items available > items_per_query.""" + + mock_create_subtask_fcn = Mock() + self._queue_subtasks(mock_create_subtask_fcn, 6, 3, 11, 3) + + # Check number of items for each subtask + mock_create_subtask_fcn_args = mock_create_subtask_fcn.call_args_list + self.assertEqual(len(mock_create_subtask_fcn_args[0][0][0]), 3) + self.assertEqual(len(mock_create_subtask_fcn_args[1][0][0]), 3) + self.assertEqual(len(mock_create_subtask_fcn_args[2][0][0]), 4) + self.assertEqual(len(mock_create_subtask_fcn_args[3][0][0]), 4)