diff --git a/lms/djangoapps/instructor_task/api.py b/lms/djangoapps/instructor_task/api.py index 5ad5d4196e..587781ab82 100644 --- a/lms/djangoapps/instructor_task/api.py +++ b/lms/djangoapps/instructor_task/api.py @@ -8,24 +8,29 @@ arguments. """ +import datetime import hashlib +import logging from collections import Counter +import pytz from celery.states import READY_STATES from common.djangoapps.util import milestones_helpers from lms.djangoapps.bulk_email.models import CourseEmail from lms.djangoapps.certificates.models import CertificateGenerationHistory from lms.djangoapps.instructor_task.api_helper import ( + QueueConnectionError, check_arguments_for_overriding, check_arguments_for_rescoring, check_entrance_exam_problems_for_rescoring, encode_entrance_exam_and_student_input, encode_problem_and_student_input, schedule_task, - submit_task + submit_task, + submit_scheduled_task, ) -from lms.djangoapps.instructor_task.models import InstructorTask +from lms.djangoapps.instructor_task.models import InstructorTask, InstructorTaskSchedule, SCHEDULED from lms.djangoapps.instructor_task.tasks import ( calculate_grades_csv, calculate_may_enroll_csv, @@ -48,6 +53,8 @@ from lms.djangoapps.instructor_task.tasks import ( ) from xmodule.modulestore.django import modulestore # lint-amnesty, pylint: disable=wrong-import-order +log = logging.getLogger(__name__) + class SpecificStudentIdMissingError(Exception): """ @@ -565,3 +572,18 @@ def generate_anonymous_ids(request, course_key): task_key = "" return submit_task(request, task_type, task_class, course_key, task_input, task_key) + + +def process_scheduled_tasks(): + """ + Utility function that retrieves tasks whose schedules have elapsed and should be processed. Only retrieves + instructor tasks that are in the `SCHEDULED` state. Then submits these tasks for processing by Celery. + """ + now = datetime.datetime.now(pytz.utc) + due_schedules = InstructorTaskSchedule.objects.filter(task__task_state=SCHEDULED).filter(task_due__lte=now) + for schedule in due_schedules: + try: + log.info(f"Attempting to queue scheduled task with id '{schedule.task.id}'") + submit_scheduled_task(schedule) + except QueueConnectionError as exc: + log.error(f"Error processing scheduled task with task id '{schedule.task.id}': {exc}") diff --git a/lms/djangoapps/instructor_task/api_helper.py b/lms/djangoapps/instructor_task/api_helper.py index d85e222863..06e09372a0 100644 --- a/lms/djangoapps/instructor_task/api_helper.py +++ b/lms/djangoapps/instructor_task/api_helper.py @@ -14,11 +14,14 @@ from celery.result import AsyncResult from celery.states import FAILURE, READY_STATES, REVOKED, SUCCESS from django.utils.translation import gettext as _ from opaque_keys.edx.keys import UsageKey +from xmodule.modulestore.django import modulestore from common.djangoapps.util.db import outer_atomic from lms.djangoapps.courseware.courses import get_problems_in_section +from lms.djangoapps.instructor_task.data import InstructorTaskTypes from lms.djangoapps.instructor_task.models import PROGRESS, SCHEDULED, InstructorTask, InstructorTaskSchedule -from xmodule.modulestore.django import modulestore # lint-amnesty, pylint: disable=wrong-import-order + +from lms.djangoapps.instructor_task.tasks import send_bulk_course_email log = logging.getLogger(__name__) @@ -266,6 +269,23 @@ def _get_async_result(task_id): return AsyncResult(task_id) +def _determine_task_class(task_type): + """ + Utility function used when processing scheduled instructor tasks. This function uses the type of an instructor task + to determine the associated Celery task function that will be used when processing the task via Celery. + + Args: + task_type (String): A string describing the type of task. + + Returns: + A Python function associated with the scheduled instructor task used during task execution. + """ + if task_type == InstructorTaskTypes.BULK_COURSE_EMAIL: + return send_bulk_course_email + + return None + + def get_updated_instructor_task(task_id): """ Returns InstructorTask object corresponding to a given `task_id`. @@ -491,3 +511,30 @@ def schedule_task(request, task_type, course_key, task_input, task_key, schedule # Set any orphaned instructor tasks to the FAILURE state. if instructor_task: _handle_instructor_task_failure(instructor_task, error) + + +def submit_scheduled_task(schedule): + """ + Helper function for submitting a scheduled task due for execution to Celery. + """ + # determine the task_class needed based off the task_type + task_class = _determine_task_class(schedule.task.task_type) + if task_class: + try: + # convert the stored argument data back into a dict from text + task_arguments = json.loads(schedule.task_args) + # turn this into the format Celery expects + task_args = [schedule.task.id, task_arguments] + # submit the task + log.info(f"Submitting scheduled task {schedule.task.id} for processing") + task_class.apply_async(task_args, task_id=schedule.task.task_id) + except Exception as error: # pylint: disable=broad-except + # broad except here to make sure we cast a wide net for tasks with issues that can't be processed + log.error(f"Error submitting scheduled task '{schedule.task.id}' to Celery: {error}") + # handle task failure + _handle_instructor_task_failure(schedule.task, error) + else: + log.warning( + f"Could not submit scheduled instructor task with id '{schedule.task.id}' and task type " + f"'{schedule.task.task_type}'. Could not determine the task class for the request." + ) diff --git a/lms/djangoapps/instructor_task/data.py b/lms/djangoapps/instructor_task/data.py new file mode 100644 index 0000000000..c9da5eda7d --- /dev/null +++ b/lms/djangoapps/instructor_task/data.py @@ -0,0 +1,34 @@ +""" +Public data structures for the instructor_task app. +""" +from enum import Enum + + +class InstructorTaskTypes(str, Enum): + """ + Enum describing the assortment of instructor tasks supported by edx-platform. + """ + BULK_COURSE_EMAIL = "bulk_course_email" + COHORT_STUDENTS = "cohort_students" + COURSE_SURVEY_REPORT = "course_survey_report" + DELETE_PROBLEM_STATE = "delete_problem_state" + DETAILED_ENROLLMENT_REPORT = "detailed_enrollment_report" + EXEC_SUMMARY_REPORT = "exec_summary_report" + EXPORT_ORA2_DATA = "export_ora2_data" + EXPORT_ORA2_SUBMISSION_FILES = "export_ora2_submission_files" + EXPORT_ORA2_SUMMARY = "export_ora2_summary" + GENERATE_ANONYMOUS_IDS_FOR_COURSE = "generate_anonymous_ids_for_course" + GENERATE_CERTIFICATES_ALL_STUDENT = "generate_certificates_all_student" + GENERATE_CERTIFICATES_CERTAIN_STUDENT = "generate_certificates_certain_student" + GENERATE_CERTIFICATES_STUDENT_SET = "generate_certificates_student_set" + GRADE_COURSE = "grade_course" + GRADE_PROBLEMS = "grade_problems" + MAY_ENROLL_INFO_CSV = "may_enroll_info_csv" + OVERRIDE_PROBLEM_SCORE = "override_problem_score" + PROBLEM_RESPONSES_CSV = "problem_responses_csv" + PROCTORED_EXAM_RESULTS_REPORT = "proctored_exam_results_report" + PROFILE_INFO_CSV = "profile_info_csv" + REGENERATE_CERTIFICATES_ALL_STUDENT = "regenerate_certificates_all_student" + RESCORE_PROBLEM = "rescore_problem" + RESCORE_PROBLEM_IF_HIGHER = "rescore_problem_if_higher" + RESET_PROBLEM_ATTEMPTS = "reset_problem_attempts" diff --git a/lms/djangoapps/instructor_task/tests/factories.py b/lms/djangoapps/instructor_task/tests/factories.py index 1cb5b9d881..b69a2a5667 100644 --- a/lms/djangoapps/instructor_task/tests/factories.py +++ b/lms/djangoapps/instructor_task/tests/factories.py @@ -2,7 +2,7 @@ Instructor Task Factory """ - +import datetime import json import factory @@ -11,10 +11,13 @@ from factory.django import DjangoModelFactory from opaque_keys.edx.locator import CourseLocator from common.djangoapps.student.tests.factories import UserFactory as StudentUserFactory -from lms.djangoapps.instructor_task.models import InstructorTask +from lms.djangoapps.instructor_task.models import InstructorTask, InstructorTaskSchedule -class InstructorTaskFactory(DjangoModelFactory): # lint-amnesty, pylint: disable=missing-class-docstring +class InstructorTaskFactory(DjangoModelFactory): + """ + Factory used to create InstructorTask instances in unit tests. + """ class Meta: model = InstructorTask @@ -26,3 +29,15 @@ class InstructorTaskFactory(DjangoModelFactory): # lint-amnesty, pylint: disabl task_state = PENDING task_output = None requester = factory.SubFactory(StudentUserFactory) + + +class InstructorTaskScheduleFactory(DjangoModelFactory): + """ + Factory used to create InstructorTaskSchedule instances in unit tests. + """ + class Meta: + model = InstructorTaskSchedule + + task = factory.SubFactory(InstructorTaskFactory) + task_args = "{}" + task_due = datetime.datetime.now().replace(tzinfo=datetime.timezone.utc) - datetime.timedelta(days=1) diff --git a/lms/djangoapps/instructor_task/tests/test_api.py b/lms/djangoapps/instructor_task/tests/test_api.py index 7df5a357bb..9b8f02699a 100644 --- a/lms/djangoapps/instructor_task/tests/test_api.py +++ b/lms/djangoapps/instructor_task/tests/test_api.py @@ -3,12 +3,16 @@ Test for LMS instructor background task queue management """ import datetime +import json from unittest.mock import MagicMock, Mock, patch +from uuid import uuid4 import pytest import pytz import ddt -from celery.states import FAILURE +from testfixtures import LogCapture +from celery.states import FAILURE, SUCCESS +from xmodule.modulestore.exceptions import ItemNotFoundError from common.djangoapps.student.tests.factories import UserFactory from common.test.utils import normalize_repr @@ -20,6 +24,7 @@ from lms.djangoapps.instructor_task.api import ( generate_certificates_for_students, get_instructor_task_history, get_running_instructor_tasks, + process_scheduled_tasks, regenerate_certificates, submit_bulk_course_email, submit_calculate_may_enroll_csv, @@ -40,9 +45,13 @@ from lms.djangoapps.instructor_task.api import ( generate_anonymous_ids ) from lms.djangoapps.instructor_task.api_helper import AlreadyRunningError, QueueConnectionError -from lms.djangoapps.instructor_task.models import PROGRESS, InstructorTask -from lms.djangoapps.instructor_task.tasks import export_ora2_data, export_ora2_submission_files, \ - generate_anonymous_ids_for_course +from lms.djangoapps.instructor_task.models import PROGRESS, SCHEDULED, InstructorTask +from lms.djangoapps.instructor_task.tasks import ( + export_ora2_data, + export_ora2_submission_files, + generate_anonymous_ids_for_course, +) +from lms.djangoapps.instructor_task.tests.factories import InstructorTaskFactory, InstructorTaskScheduleFactory from lms.djangoapps.instructor_task.tests.test_base import ( TEST_COURSE_KEY, InstructorTaskCourseTestCase, @@ -50,7 +59,8 @@ from lms.djangoapps.instructor_task.tests.test_base import ( InstructorTaskTestCase, TestReportMixin ) -from xmodule.modulestore.exceptions import ItemNotFoundError # lint-amnesty, pylint: disable=wrong-import-order + +LOG_PATH = 'lms.djangoapps.instructor_task.api' class InstructorTaskReportTest(InstructorTaskTestCase): @@ -233,6 +243,41 @@ class InstructorTaskCourseSubmitTest(TestReportMixin, InstructorTaskCourseTestCa with pytest.raises(AlreadyRunningError): api_call() + def _generate_scheduled_task(self, task_state=None): + return InstructorTaskFactory.create( + task_type="bulk_course_email", + course_id=self.course.id, + task_input="{'email_id': 1, 'to_option': ['myself']}", + task_key="3416a75f4cea9109507cacd8e2f2aefc", + task_id=str(uuid4()), + task_state=task_state if task_state else SCHEDULED, + task_output=None, + requester=self.instructor + ) + + def _generate_scheduled_task_schedule(self, task, due_date): + return InstructorTaskScheduleFactory.create( + task=task, + task_args=json.dumps(self._generate_task_args()), + task_due=due_date + ) + + def _generate_task_args(self): + """ + Utility function that creates a sample `task_args` value for a scheduled task. + """ + task_args = { + "request_info": { + "username": self.instructor.username, + "user_id": self.instructor.id, + "ip": "127.0.0.1", + "agent": "Mozilla", + "host": "localhost:18000" + }, + "task_id": "622748b3-2831-432e-b519-4fde2706ca59" + } + return task_args + def test_submit_bulk_email_all(self): email_id = self._define_course_email() api_call = lambda: submit_bulk_course_email( @@ -395,6 +440,9 @@ class InstructorTaskCourseSubmitTest(TestReportMixin, InstructorTaskCourseTestCa @patch("lms.djangoapps.instructor_task.api.schedule_task") @patch("lms.djangoapps.instructor_task.api.submit_task") def test_submit_bulk_course_email_with_schedule(self, mock_submit_task, mock_schedule_task): + """ + A test to determine if the right helper function is being called when a scheduled task is being processed. + """ email_id = self._define_course_email() schedule = datetime.datetime(2030, 8, 15, 8, 15, 12, 0, pytz.utc) submit_bulk_course_email( @@ -405,3 +453,57 @@ class InstructorTaskCourseSubmitTest(TestReportMixin, InstructorTaskCourseTestCa ) mock_schedule_task.assert_called_once() mock_submit_task.assert_not_called() + + @patch("lms.djangoapps.instructor_task.api.submit_scheduled_task") + def test_process_scheduled_tasks(self, mock_submit_scheduled_task): + """ + A test to verify the functionality of the `process_scheduled_tasks` function. This function determines which + scheduled instructor tasks are due for execution. + + This test generates three scheduled tasks; one that has been processed, one that is due for processing, and one + that is due in the future. In this test, one only of these tasks should be eligible for processing. + """ + base_date = datetime.datetime.now().replace(tzinfo=datetime.timezone.utc) + executed_instructor_task = self._generate_scheduled_task(task_state=SUCCESS) + executed_instructor_task_due_date = base_date - datetime.timedelta(days=5) + self._generate_scheduled_task_schedule(executed_instructor_task, executed_instructor_task_due_date) + + due_instructor_task = self._generate_scheduled_task() + due_instructor_task_due_date = base_date - datetime.timedelta(days=1) + due_instructor_task_schedule = self._generate_scheduled_task_schedule( + due_instructor_task, + due_instructor_task_due_date + ) + + future_instructor_task = self._generate_scheduled_task() + future_instructor_task_due_date = base_date + datetime.timedelta(days=15) + self._generate_scheduled_task_schedule(future_instructor_task, future_instructor_task_due_date) + + expected_messages = [ + f"Attempting to queue scheduled task with id '{due_instructor_task.id}'" + ] + + with LogCapture() as log: + process_scheduled_tasks() + + mock_submit_scheduled_task.assert_called_once_with(due_instructor_task_schedule) + log.check_present((LOG_PATH, "INFO", expected_messages[0]),) + + @patch("lms.djangoapps.instructor_task.api.submit_scheduled_task", side_effect=QueueConnectionError("blammo!")) + def test_process_scheduled_tasks_expect_error(self, mock_scheduled_task): + """ + A test that verifies the behavior of the `process_scheduled_tasks` function when there is an error processing + the request. + """ + base_date = datetime.datetime.now().replace(tzinfo=datetime.timezone.utc) + due_instructor_task = self._generate_scheduled_task() + due_instructor_task_due_date = base_date - datetime.timedelta(days=1) + self._generate_scheduled_task_schedule(due_instructor_task, due_instructor_task_due_date) + expected_messages = [ + f"Error processing scheduled task with task id '{due_instructor_task.id}': blammo!", + ] + + with LogCapture() as log: + process_scheduled_tasks() + + log.check_present((LOG_PATH, "ERROR", expected_messages[0]),) diff --git a/lms/djangoapps/instructor_task/tests/test_api_helper.py b/lms/djangoapps/instructor_task/tests/test_api_helper.py index dcba37bb46..cbc566944b 100644 --- a/lms/djangoapps/instructor_task/tests/test_api_helper.py +++ b/lms/djangoapps/instructor_task/tests/test_api_helper.py @@ -5,17 +5,21 @@ import datetime import hashlib import json from unittest.mock import patch -from testfixtures import LogCapture +from uuid import uuid4 +from testfixtures import LogCapture from celery.states import FAILURE from common.djangoapps.student.tests.factories import UserFactory from lms.djangoapps.bulk_email.api import create_course_email from lms.djangoapps.bulk_email.data import BulkEmailTargetChoices -from lms.djangoapps.instructor_task.api_helper import QueueConnectionError, schedule_task +from lms.djangoapps.instructor_task.api_helper import QueueConnectionError, schedule_task, submit_scheduled_task from lms.djangoapps.instructor_task.models import SCHEDULED, InstructorTask, InstructorTaskSchedule +from lms.djangoapps.instructor_task.tests.factories import InstructorTaskFactory, InstructorTaskScheduleFactory from lms.djangoapps.instructor_task.tests.test_base import InstructorTaskCourseTestCase +LOG_PATH = "lms.djangoapps.instructor_task.api_helper" + class ScheduledBulkEmailInstructorTaskTests(InstructorTaskCourseTestCase): """ @@ -132,3 +136,99 @@ class ScheduledBulkEmailInstructorTaskTests(InstructorTaskCourseTestCase): task = InstructorTask.objects.get(course_id=self.course.id, task_key=self.task_key) assert task.task_state == FAILURE self._verify_log_messages(expected_messages, log) + + +class ScheduledInstructorTaskSubmissionTests(InstructorTaskCourseTestCase): + """ + Unit tests scheduled instructor task functionality. Verifies behavior around retrieving and submission of instructor + tasks due for execution. + """ + def setUp(self): + super().setUp() + self.initialize_course() + self.instructor = UserFactory.create(username="instructor", email="instructor@edx.org") + # create an instructor task instance + task_id = str(uuid4()) + self.task = InstructorTaskFactory.create( + task_type="bulk_course_email", + course_id=self.course.id, + task_input="{'email_id': 41, 'to_option': ['myself']}", + task_key="3416a75f4cea9109507cacd8e2f2aefc", + task_id=task_id, + task_state=SCHEDULED, + task_output=None, + requester=self.instructor + ) + # associate the task with a instructor task schedule instance + task_args = { + "request_info": { + "username": self.instructor.username, + "user_id": self.instructor.id, + "ip": "192.168.1.100", + "agent": "Mozilla", + "host": "localhost:18000" + }, + "task_id": self.task.task_id + } + self.task_schedule = InstructorTaskScheduleFactory.create( + task=self.task, + task_args=json.dumps(task_args), + ) + + @patch("lms.djangoapps.instructor_task.api_helper.send_bulk_course_email.apply_async") + def test_submit_scheduled_instructor_task(self, mock_task_execution): + """ + A test that verifies the behavior of submitting a scheduled instructor task for execution. + """ + schedule = InstructorTaskSchedule.objects.get(task=self.task.id) + expected_task_arguments = json.loads(schedule.task_args) + expected_task_args = [schedule.task.id, expected_task_arguments] + expected_messages = [ + f"Submitting scheduled task {schedule.task.id} for processing", + ] + + with LogCapture() as log: + submit_scheduled_task(schedule) + + mock_task_execution.assert_called_once() + mock_task_execution.assert_called_with(expected_task_args, task_id=schedule.task.task_id) + log.check_present((LOG_PATH, "INFO", expected_messages[0])) + + def test_submit_scheduled_instructor_task_bad_task_class(self): + """ + A test that verifies behavior when we can't determine the task class to use when submitting our scheduled task + for execution. + """ + schedule = InstructorTaskSchedule.objects.get(task=self.task.id) + task = schedule.task + task.task_type = "task_without_scheduling_support" + task.save() + expected_messages = [ + f"Could not submit scheduled instructor task with id '{schedule.task.id}' and task type " + f"'{task.task_type}'. Could not determine the task class for the request.", + ] + + with LogCapture() as log: + submit_scheduled_task(schedule) + + log.check_present((LOG_PATH, "WARNING", expected_messages[0])) + + def test_submit_scheduled_instructor_task_with_error(self): + """ + A test that verifies our task handling if an error occurs during task submission. Verifies that the task failure + is correctly handled. + """ + schedule = InstructorTaskSchedule.objects.get(task=self.task.id) + schedule.task_args = "{malformed JSON data}" + schedule.save() + expected_messages = [ + f"Error submitting scheduled task '{schedule.task.id}' to Celery: Expecting property name enclosed in " + "double quotes: line 1 column 2 (char 1)", + ] + + with self.assertRaises(QueueConnectionError): + with LogCapture() as log: + submit_scheduled_task(schedule) + + assert schedule.task.task_state == FAILURE + log.check_present((LOG_PATH, "ERROR", expected_messages[0]))