From 6dfa47b2fc57981c520a3b49aa8736275014af2c Mon Sep 17 00:00:00 2001 From: Alex Dusenbery Date: Wed, 21 Jun 2017 15:29:22 -0400 Subject: [PATCH] EDUCATOR-565 | Add POLICY_CHANGE_GRADES_ROUTING_KEY, fix errors in compute_all_grades_for_course circuitry. --- cms/celery.py | 12 ++++++++- .../contentstore/signals/handlers.py | 12 ++++----- .../contentstore/signals/signals.py | 2 +- .../tests/test_course_settings.py | 24 ++++++++--------- .../models/settings/course_grading.py | 2 +- cms/envs/aws.py | 4 +++ cms/envs/common.py | 3 +++ lms/djangoapps/grades/tasks.py | 27 +++++++++---------- lms/envs/aws.py | 3 +++ lms/envs/common.py | 3 +++ openedx/core/lib/celery/routers.py | 13 ++++++++- 11 files changed, 69 insertions(+), 36 deletions(-) diff --git a/cms/celery.py b/cms/celery.py index 0d7505cfa3..fd3235a529 100644 --- a/cms/celery.py +++ b/cms/celery.py @@ -34,8 +34,18 @@ class Router(AlternateEnvironmentRouter): """ Defines alternate environment tasks, as a dict of form { task_name: alternate_queue } """ + # The tasks below will be routed to the default lms queue. return { 'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache': 'lms', 'openedx.core.djangoapps.content.block_structure.tasks.update_course_in_cache_v2': 'lms', - 'lms.djangoapps.grades.tasks.compute_all_grades_for_course': 'lms', + } + + @property + def explicit_queues(self): + """ + Defines specific queues for tasks to run in (typically outside of the cms environment), + as a dict of form { task_name: queue_name }. + """ + return { + 'lms.djangoapps.grades.tasks.compute_all_grades_for_course': settings.POLICY_CHANGE_GRADES_ROUTING_KEY, } diff --git a/cms/djangoapps/contentstore/signals/handlers.py b/cms/djangoapps/contentstore/signals/handlers.py index 6736c03f9b..0fd23d8ebf 100644 --- a/cms/djangoapps/contentstore/signals/handlers.py +++ b/cms/djangoapps/contentstore/signals/handlers.py @@ -95,12 +95,12 @@ def handle_grading_policy_changed(sender, **kwargs): """ Receives signal and kicks off celery task to recalculate grades """ - course_key = kwargs.get('course_key') - result = compute_all_grades_for_course.apply_async( - course_key=course_key, - event_transaction_id=get_event_transaction_id(), - event_transaction_type=get_event_transaction_type(), - ) + kwargs = { + 'course_key': unicode(kwargs.get('course_key')), + 'event_transaction_id': unicode(get_event_transaction_id()), + 'event_transaction_type': unicode(get_event_transaction_type()), + } + result = compute_all_grades_for_course.apply_async(kwargs=kwargs) log.info("Grades: Created {task_name}[{task_id}] with arguments {kwargs}".format( task_name=compute_all_grades_for_course.name, task_id=result.task_id, diff --git a/cms/djangoapps/contentstore/signals/signals.py b/cms/djangoapps/contentstore/signals/signals.py index 87f6994165..f73b13b9ea 100644 --- a/cms/djangoapps/contentstore/signals/signals.py +++ b/cms/djangoapps/contentstore/signals/signals.py @@ -9,6 +9,6 @@ from django.dispatch import Signal GRADING_POLICY_CHANGED = Signal( providing_args=[ 'user_id', # Integer User ID - 'course_id', # Unicode string representing the course + 'course_key', # Unicode string representing the course ] ) diff --git a/cms/djangoapps/contentstore/tests/test_course_settings.py b/cms/djangoapps/contentstore/tests/test_course_settings.py index 6a0acb5d0a..fdd14d9bb0 100644 --- a/cms/djangoapps/contentstore/tests/test_course_settings.py +++ b/cms/djangoapps/contentstore/tests/test_course_settings.py @@ -455,11 +455,11 @@ class CourseGradingTest(CourseTestCase): # one for each of the calls to update_from_json() send_signal.assert_has_calls([ - mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), - mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), - mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), - mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), - mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), + mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id), + mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id), + mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id), + mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id), + mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id), ]) # one for each of the calls to update_from_json(); the last update doesn't actually change the parts of the @@ -505,9 +505,9 @@ class CourseGradingTest(CourseTestCase): # one for each of the calls to update_grader_from_json() send_signal.assert_has_calls([ - mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), - mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), - mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), + mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id), + mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id), + mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id), ]) # one for each of the calls to update_grader_from_json() @@ -620,8 +620,8 @@ class CourseGradingTest(CourseTestCase): # one for each call to update_section_grader_type() send_signal.assert_has_calls([ - mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), - mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), + mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id), + mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id), ]) tracker.emit.assert_has_calls([ @@ -698,9 +698,9 @@ class CourseGradingTest(CourseTestCase): self.assertNotIn(original_model['graders'][1], updated_model['graders']) send_signal.assert_has_calls([ # once for the POST - mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), + mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id), # once for the DELETE - mock.call(sender=CourseGradingModel, user_id=self.user.id, course_id=self.course.id), + mock.call(sender=CourseGradingModel, user_id=self.user.id, course_key=self.course.id), ]) def setup_test_set_get_section_grader_ajax(self): diff --git a/cms/djangoapps/models/settings/course_grading.py b/cms/djangoapps/models/settings/course_grading.py index d3c91230fb..df01571306 100644 --- a/cms/djangoapps/models/settings/course_grading.py +++ b/cms/djangoapps/models/settings/course_grading.py @@ -265,7 +265,7 @@ def _grading_event_and_signal(course_key, user_id): "event_transaction_type": GRADING_POLICY_CHANGED_EVENT_TYPE, } tracker.emit(name, data) - GRADING_POLICY_CHANGED.send(sender=CourseGradingModel, user_id=user_id, course_id=course_key) + GRADING_POLICY_CHANGED.send(sender=CourseGradingModel, user_id=user_id, course_key=course_key) def hash_grading_policy(grading_policy): diff --git a/cms/envs/aws.py b/cms/envs/aws.py index 004f4f16c5..e64dbbf08a 100644 --- a/cms/envs/aws.py +++ b/cms/envs/aws.py @@ -400,6 +400,7 @@ ALTERNATE_QUEUES = [ DEFAULT_PRIORITY_QUEUE.replace(QUEUE_VARIANT, alternate + '.') for alternate in ALTERNATE_QUEUE_ENVS ] + CELERY_QUEUES.update( { alternate: {} @@ -408,6 +409,9 @@ CELERY_QUEUES.update( } ) +# Queue to use for updating grades due to grading policy change +POLICY_CHANGE_GRADES_ROUTING_KEY = ENV_TOKENS.get('POLICY_CHANGE_GRADES_ROUTING_KEY', LOW_PRIORITY_QUEUE) + # Event tracking TRACKING_BACKENDS.update(AUTH_TOKENS.get("TRACKING_BACKENDS", {})) EVENT_TRACKING_BACKENDS['tracking_logs']['OPTIONS']['backends'].update(AUTH_TOKENS.get("EVENT_TRACKING_BACKENDS", {})) diff --git a/cms/envs/common.py b/cms/envs/common.py index 5597e7ce1a..f3dd1f9a11 100644 --- a/cms/envs/common.py +++ b/cms/envs/common.py @@ -1339,6 +1339,9 @@ COURSE_CATALOG_API_URL = None # Queue to use for updating persistent grades RECALCULATE_GRADES_ROUTING_KEY = LOW_PRIORITY_QUEUE +# Queue to use for updating grades due to grading policy change +POLICY_CHANGE_GRADES_ROUTING_KEY = LOW_PRIORITY_QUEUE + ############## Settings for CourseGraph ############################ COURSEGRAPH_JOB_QUEUE = LOW_PRIORITY_QUEUE diff --git a/lms/djangoapps/grades/tasks.py b/lms/djangoapps/grades/tasks.py index e03b407641..61a38ac835 100644 --- a/lms/djangoapps/grades/tasks.py +++ b/lms/djangoapps/grades/tasks.py @@ -51,19 +51,22 @@ class _BaseTask(PersistOnFailureTask, LoggedTask): # pylint: disable=abstract-m abstract = True -@task(base=_BaseTask) +@task(base=_BaseTask, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY) def compute_all_grades_for_course(**kwargs): """ Compute grades for all students in the specified course. Kicks off a series of compute_grades_for_course_v2 tasks to cover all of the students in the course. """ - for course_key, offset, batch_size in _course_task_args( - course_key=kwargs.pop('course_key'), - kwargs=kwargs - ): - task_options = {'course_key': course_key, 'offset': offset, 'batch_size': batch_size} - compute_grades_for_course_v2.apply_async(kwargs=kwargs, **task_options) + course_key = CourseKey.from_string(kwargs.pop('course_key')) + for course_key_string, offset, batch_size in _course_task_args(course_key=course_key, **kwargs): + kwargs.update({ + 'course_key': course_key_string, + 'offset': offset, + 'batch_size': batch_size, + 'routing_key': settings.POLICY_CHANGE_GRADES_ROUTING_KEY, + }) + compute_grades_for_course_v2.apply_async(kwargs=kwargs) @task(base=_BaseTask, bind=True, default_retry_delay=30, max_retries=1) @@ -92,14 +95,11 @@ def compute_grades_for_course_v2(self, **kwargs): if 'event_transaction_type' in kwargs: set_event_transaction_type(kwargs['event_transaction_type']) - course_key = kwargs.pop('course_key') - offset = kwargs.pop('offset') - batch_size = kwargs.pop('batch_size') - estimate_first_attempted = kwargs.pop('estimate_first_attempted', False) - if estimate_first_attempted: + if kwargs.get('estimate_first_attempted'): waffle().override_for_request(ESTIMATE_FIRST_ATTEMPTED, True) + try: - return compute_grades_for_course(course_key, offset, batch_size) + return compute_grades_for_course(kwargs['course_key'], kwargs['offset'], kwargs['batch_size']) except Exception as exc: # pylint: disable=broad-except raise self.retry(kwargs=kwargs, exc=exc) @@ -113,7 +113,6 @@ def compute_grades_for_course(course_key, offset, batch_size, **kwargs): # pyli limited to at most students, starting from the specified offset. """ - course = courses.get_course_by_id(CourseKey.from_string(course_key)) enrollments = CourseEnrollment.objects.filter(course_id=course.id).order_by('created') student_iter = (enrollment.user for enrollment in enrollments[offset:offset + batch_size]) diff --git a/lms/envs/aws.py b/lms/envs/aws.py index beec822de9..8fa6c50b1e 100644 --- a/lms/envs/aws.py +++ b/lms/envs/aws.py @@ -272,6 +272,9 @@ BULK_EMAIL_ROUTING_KEY_SMALL_JOBS = ENV_TOKENS.get('BULK_EMAIL_ROUTING_KEY_SMALL # Queue to use for updating persistent grades RECALCULATE_GRADES_ROUTING_KEY = ENV_TOKENS.get('RECALCULATE_GRADES_ROUTING_KEY', LOW_PRIORITY_QUEUE) +# Queue to use for updating grades due to grading policy change +POLICY_CHANGE_GRADES_ROUTING_KEY = ENV_TOKENS.get('POLICY_CHANGE_GRADES_ROUTING_KEY', LOW_PRIORITY_QUEUE) + # Message expiry time in seconds CELERY_EVENT_QUEUE_TTL = ENV_TOKENS.get('CELERY_EVENT_QUEUE_TTL', None) diff --git a/lms/envs/common.py b/lms/envs/common.py index 3be2b78cb7..5f3ab59669 100644 --- a/lms/envs/common.py +++ b/lms/envs/common.py @@ -1927,6 +1927,9 @@ BULK_EMAIL_RETRY_DELAY_BETWEEN_SENDS = 0.02 # Queue to use for updating persistent grades RECALCULATE_GRADES_ROUTING_KEY = LOW_PRIORITY_QUEUE +# Queue to use for updating grades due to grading policy change +POLICY_CHANGE_GRADES_ROUTING_KEY = LOW_PRIORITY_QUEUE + ############################# Email Opt In #################################### # Minimum age for organization-wide email opt in diff --git a/openedx/core/lib/celery/routers.py b/openedx/core/lib/celery/routers.py index dd3dd7823f..4d75d93612 100644 --- a/openedx/core/lib/celery/routers.py +++ b/openedx/core/lib/celery/routers.py @@ -21,21 +21,32 @@ class AlternateEnvironmentRouter(object): @abstractproperty def alternate_env_tasks(self): """ - Defines the task -> alternate worker environment queue to be used when routing. + Defines the task -> alternate worker environment to be used when routing. Subclasses must override this property with their own specific mappings. """ return {} + @property + def explicit_queues(self): + """ + Defines the task -> alternate worker queue to be used when routing. + """ + return {} + def route_for_task(self, task, args=None, kwargs=None): # pylint: disable=unused-argument """ Celery-defined method allowing for custom routing logic. If None is returned from this method, default routing logic is used. """ + if task in self.explicit_queues: + return self.explicit_queues[task] + alternate_env = self.alternate_env_tasks.get(task, None) if alternate_env: return self.ensure_queue_env(alternate_env) + return None def ensure_queue_env(self, desired_env):