feat: add support for processing scheduled instructor tasks

[MICROBA-1507]

* adds new functionality for retrieving and submitting scheduled tasks to Celery
* add an InstructorTaskSchedule factory for unit testing
* adds unit tests for processing scheduled tasks
This commit is contained in:
Justin Hynes
2022-04-01 11:10:19 -04:00
parent b97af6ac6e
commit bec97653b5
6 changed files with 333 additions and 13 deletions

View File

@@ -8,24 +8,29 @@ arguments.
"""
import datetime
import hashlib
import logging
from collections import Counter
import pytz
from celery.states import READY_STATES
from common.djangoapps.util import milestones_helpers
from lms.djangoapps.bulk_email.models import CourseEmail
from lms.djangoapps.certificates.models import CertificateGenerationHistory
from lms.djangoapps.instructor_task.api_helper import (
QueueConnectionError,
check_arguments_for_overriding,
check_arguments_for_rescoring,
check_entrance_exam_problems_for_rescoring,
encode_entrance_exam_and_student_input,
encode_problem_and_student_input,
schedule_task,
submit_task
submit_task,
submit_scheduled_task,
)
from lms.djangoapps.instructor_task.models import InstructorTask
from lms.djangoapps.instructor_task.models import InstructorTask, InstructorTaskSchedule, SCHEDULED
from lms.djangoapps.instructor_task.tasks import (
calculate_grades_csv,
calculate_may_enroll_csv,
@@ -48,6 +53,8 @@ from lms.djangoapps.instructor_task.tasks import (
)
from xmodule.modulestore.django import modulestore # lint-amnesty, pylint: disable=wrong-import-order
log = logging.getLogger(__name__)
class SpecificStudentIdMissingError(Exception):
"""
@@ -565,3 +572,18 @@ def generate_anonymous_ids(request, course_key):
task_key = ""
return submit_task(request, task_type, task_class, course_key, task_input, task_key)
def process_scheduled_tasks():
"""
Utility function that retrieves tasks whose schedules have elapsed and should be processed. Only retrieves
instructor tasks that are in the `SCHEDULED` state. Then submits these tasks for processing by Celery.
"""
now = datetime.datetime.now(pytz.utc)
due_schedules = InstructorTaskSchedule.objects.filter(task__task_state=SCHEDULED).filter(task_due__lte=now)
for schedule in due_schedules:
try:
log.info(f"Attempting to queue scheduled task with id '{schedule.task.id}'")
submit_scheduled_task(schedule)
except QueueConnectionError as exc:
log.error(f"Error processing scheduled task with task id '{schedule.task.id}': {exc}")

View File

@@ -14,11 +14,14 @@ from celery.result import AsyncResult
from celery.states import FAILURE, READY_STATES, REVOKED, SUCCESS
from django.utils.translation import gettext as _
from opaque_keys.edx.keys import UsageKey
from xmodule.modulestore.django import modulestore
from common.djangoapps.util.db import outer_atomic
from lms.djangoapps.courseware.courses import get_problems_in_section
from lms.djangoapps.instructor_task.data import InstructorTaskTypes
from lms.djangoapps.instructor_task.models import PROGRESS, SCHEDULED, InstructorTask, InstructorTaskSchedule
from xmodule.modulestore.django import modulestore # lint-amnesty, pylint: disable=wrong-import-order
from lms.djangoapps.instructor_task.tasks import send_bulk_course_email
log = logging.getLogger(__name__)
@@ -266,6 +269,23 @@ def _get_async_result(task_id):
return AsyncResult(task_id)
def _determine_task_class(task_type):
"""
Utility function used when processing scheduled instructor tasks. This function uses the type of an instructor task
to determine the associated Celery task function that will be used when processing the task via Celery.
Args:
task_type (String): A string describing the type of task.
Returns:
A Python function associated with the scheduled instructor task used during task execution.
"""
if task_type == InstructorTaskTypes.BULK_COURSE_EMAIL:
return send_bulk_course_email
return None
def get_updated_instructor_task(task_id):
"""
Returns InstructorTask object corresponding to a given `task_id`.
@@ -491,3 +511,30 @@ def schedule_task(request, task_type, course_key, task_input, task_key, schedule
# Set any orphaned instructor tasks to the FAILURE state.
if instructor_task:
_handle_instructor_task_failure(instructor_task, error)
def submit_scheduled_task(schedule):
"""
Helper function for submitting a scheduled task due for execution to Celery.
"""
# determine the task_class needed based off the task_type
task_class = _determine_task_class(schedule.task.task_type)
if task_class:
try:
# convert the stored argument data back into a dict from text
task_arguments = json.loads(schedule.task_args)
# turn this into the format Celery expects
task_args = [schedule.task.id, task_arguments]
# submit the task
log.info(f"Submitting scheduled task {schedule.task.id} for processing")
task_class.apply_async(task_args, task_id=schedule.task.task_id)
except Exception as error: # pylint: disable=broad-except
# broad except here to make sure we cast a wide net for tasks with issues that can't be processed
log.error(f"Error submitting scheduled task '{schedule.task.id}' to Celery: {error}")
# handle task failure
_handle_instructor_task_failure(schedule.task, error)
else:
log.warning(
f"Could not submit scheduled instructor task with id '{schedule.task.id}' and task type "
f"'{schedule.task.task_type}'. Could not determine the task class for the request."
)

View File

@@ -0,0 +1,34 @@
"""
Public data structures for the instructor_task app.
"""
from enum import Enum
class InstructorTaskTypes(str, Enum):
"""
Enum describing the assortment of instructor tasks supported by edx-platform.
"""
BULK_COURSE_EMAIL = "bulk_course_email"
COHORT_STUDENTS = "cohort_students"
COURSE_SURVEY_REPORT = "course_survey_report"
DELETE_PROBLEM_STATE = "delete_problem_state"
DETAILED_ENROLLMENT_REPORT = "detailed_enrollment_report"
EXEC_SUMMARY_REPORT = "exec_summary_report"
EXPORT_ORA2_DATA = "export_ora2_data"
EXPORT_ORA2_SUBMISSION_FILES = "export_ora2_submission_files"
EXPORT_ORA2_SUMMARY = "export_ora2_summary"
GENERATE_ANONYMOUS_IDS_FOR_COURSE = "generate_anonymous_ids_for_course"
GENERATE_CERTIFICATES_ALL_STUDENT = "generate_certificates_all_student"
GENERATE_CERTIFICATES_CERTAIN_STUDENT = "generate_certificates_certain_student"
GENERATE_CERTIFICATES_STUDENT_SET = "generate_certificates_student_set"
GRADE_COURSE = "grade_course"
GRADE_PROBLEMS = "grade_problems"
MAY_ENROLL_INFO_CSV = "may_enroll_info_csv"
OVERRIDE_PROBLEM_SCORE = "override_problem_score"
PROBLEM_RESPONSES_CSV = "problem_responses_csv"
PROCTORED_EXAM_RESULTS_REPORT = "proctored_exam_results_report"
PROFILE_INFO_CSV = "profile_info_csv"
REGENERATE_CERTIFICATES_ALL_STUDENT = "regenerate_certificates_all_student"
RESCORE_PROBLEM = "rescore_problem"
RESCORE_PROBLEM_IF_HIGHER = "rescore_problem_if_higher"
RESET_PROBLEM_ATTEMPTS = "reset_problem_attempts"

View File

@@ -2,7 +2,7 @@
Instructor Task Factory
"""
import datetime
import json
import factory
@@ -11,10 +11,13 @@ from factory.django import DjangoModelFactory
from opaque_keys.edx.locator import CourseLocator
from common.djangoapps.student.tests.factories import UserFactory as StudentUserFactory
from lms.djangoapps.instructor_task.models import InstructorTask
from lms.djangoapps.instructor_task.models import InstructorTask, InstructorTaskSchedule
class InstructorTaskFactory(DjangoModelFactory): # lint-amnesty, pylint: disable=missing-class-docstring
class InstructorTaskFactory(DjangoModelFactory):
"""
Factory used to create InstructorTask instances in unit tests.
"""
class Meta:
model = InstructorTask
@@ -26,3 +29,15 @@ class InstructorTaskFactory(DjangoModelFactory): # lint-amnesty, pylint: disabl
task_state = PENDING
task_output = None
requester = factory.SubFactory(StudentUserFactory)
class InstructorTaskScheduleFactory(DjangoModelFactory):
"""
Factory used to create InstructorTaskSchedule instances in unit tests.
"""
class Meta:
model = InstructorTaskSchedule
task = factory.SubFactory(InstructorTaskFactory)
task_args = "{}"
task_due = datetime.datetime.now().replace(tzinfo=datetime.timezone.utc) - datetime.timedelta(days=1)

View File

@@ -3,12 +3,16 @@ Test for LMS instructor background task queue management
"""
import datetime
import json
from unittest.mock import MagicMock, Mock, patch
from uuid import uuid4
import pytest
import pytz
import ddt
from celery.states import FAILURE
from testfixtures import LogCapture
from celery.states import FAILURE, SUCCESS
from xmodule.modulestore.exceptions import ItemNotFoundError
from common.djangoapps.student.tests.factories import UserFactory
from common.test.utils import normalize_repr
@@ -20,6 +24,7 @@ from lms.djangoapps.instructor_task.api import (
generate_certificates_for_students,
get_instructor_task_history,
get_running_instructor_tasks,
process_scheduled_tasks,
regenerate_certificates,
submit_bulk_course_email,
submit_calculate_may_enroll_csv,
@@ -40,9 +45,13 @@ from lms.djangoapps.instructor_task.api import (
generate_anonymous_ids
)
from lms.djangoapps.instructor_task.api_helper import AlreadyRunningError, QueueConnectionError
from lms.djangoapps.instructor_task.models import PROGRESS, InstructorTask
from lms.djangoapps.instructor_task.tasks import export_ora2_data, export_ora2_submission_files, \
generate_anonymous_ids_for_course
from lms.djangoapps.instructor_task.models import PROGRESS, SCHEDULED, InstructorTask
from lms.djangoapps.instructor_task.tasks import (
export_ora2_data,
export_ora2_submission_files,
generate_anonymous_ids_for_course,
)
from lms.djangoapps.instructor_task.tests.factories import InstructorTaskFactory, InstructorTaskScheduleFactory
from lms.djangoapps.instructor_task.tests.test_base import (
TEST_COURSE_KEY,
InstructorTaskCourseTestCase,
@@ -50,7 +59,8 @@ from lms.djangoapps.instructor_task.tests.test_base import (
InstructorTaskTestCase,
TestReportMixin
)
from xmodule.modulestore.exceptions import ItemNotFoundError # lint-amnesty, pylint: disable=wrong-import-order
LOG_PATH = 'lms.djangoapps.instructor_task.api'
class InstructorTaskReportTest(InstructorTaskTestCase):
@@ -233,6 +243,41 @@ class InstructorTaskCourseSubmitTest(TestReportMixin, InstructorTaskCourseTestCa
with pytest.raises(AlreadyRunningError):
api_call()
def _generate_scheduled_task(self, task_state=None):
return InstructorTaskFactory.create(
task_type="bulk_course_email",
course_id=self.course.id,
task_input="{'email_id': 1, 'to_option': ['myself']}",
task_key="3416a75f4cea9109507cacd8e2f2aefc",
task_id=str(uuid4()),
task_state=task_state if task_state else SCHEDULED,
task_output=None,
requester=self.instructor
)
def _generate_scheduled_task_schedule(self, task, due_date):
return InstructorTaskScheduleFactory.create(
task=task,
task_args=json.dumps(self._generate_task_args()),
task_due=due_date
)
def _generate_task_args(self):
"""
Utility function that creates a sample `task_args` value for a scheduled task.
"""
task_args = {
"request_info": {
"username": self.instructor.username,
"user_id": self.instructor.id,
"ip": "127.0.0.1",
"agent": "Mozilla",
"host": "localhost:18000"
},
"task_id": "622748b3-2831-432e-b519-4fde2706ca59"
}
return task_args
def test_submit_bulk_email_all(self):
email_id = self._define_course_email()
api_call = lambda: submit_bulk_course_email(
@@ -395,6 +440,9 @@ class InstructorTaskCourseSubmitTest(TestReportMixin, InstructorTaskCourseTestCa
@patch("lms.djangoapps.instructor_task.api.schedule_task")
@patch("lms.djangoapps.instructor_task.api.submit_task")
def test_submit_bulk_course_email_with_schedule(self, mock_submit_task, mock_schedule_task):
"""
A test to determine if the right helper function is being called when a scheduled task is being processed.
"""
email_id = self._define_course_email()
schedule = datetime.datetime(2030, 8, 15, 8, 15, 12, 0, pytz.utc)
submit_bulk_course_email(
@@ -405,3 +453,57 @@ class InstructorTaskCourseSubmitTest(TestReportMixin, InstructorTaskCourseTestCa
)
mock_schedule_task.assert_called_once()
mock_submit_task.assert_not_called()
@patch("lms.djangoapps.instructor_task.api.submit_scheduled_task")
def test_process_scheduled_tasks(self, mock_submit_scheduled_task):
"""
A test to verify the functionality of the `process_scheduled_tasks` function. This function determines which
scheduled instructor tasks are due for execution.
This test generates three scheduled tasks; one that has been processed, one that is due for processing, and one
that is due in the future. In this test, one only of these tasks should be eligible for processing.
"""
base_date = datetime.datetime.now().replace(tzinfo=datetime.timezone.utc)
executed_instructor_task = self._generate_scheduled_task(task_state=SUCCESS)
executed_instructor_task_due_date = base_date - datetime.timedelta(days=5)
self._generate_scheduled_task_schedule(executed_instructor_task, executed_instructor_task_due_date)
due_instructor_task = self._generate_scheduled_task()
due_instructor_task_due_date = base_date - datetime.timedelta(days=1)
due_instructor_task_schedule = self._generate_scheduled_task_schedule(
due_instructor_task,
due_instructor_task_due_date
)
future_instructor_task = self._generate_scheduled_task()
future_instructor_task_due_date = base_date + datetime.timedelta(days=15)
self._generate_scheduled_task_schedule(future_instructor_task, future_instructor_task_due_date)
expected_messages = [
f"Attempting to queue scheduled task with id '{due_instructor_task.id}'"
]
with LogCapture() as log:
process_scheduled_tasks()
mock_submit_scheduled_task.assert_called_once_with(due_instructor_task_schedule)
log.check_present((LOG_PATH, "INFO", expected_messages[0]),)
@patch("lms.djangoapps.instructor_task.api.submit_scheduled_task", side_effect=QueueConnectionError("blammo!"))
def test_process_scheduled_tasks_expect_error(self, mock_scheduled_task):
"""
A test that verifies the behavior of the `process_scheduled_tasks` function when there is an error processing
the request.
"""
base_date = datetime.datetime.now().replace(tzinfo=datetime.timezone.utc)
due_instructor_task = self._generate_scheduled_task()
due_instructor_task_due_date = base_date - datetime.timedelta(days=1)
self._generate_scheduled_task_schedule(due_instructor_task, due_instructor_task_due_date)
expected_messages = [
f"Error processing scheduled task with task id '{due_instructor_task.id}': blammo!",
]
with LogCapture() as log:
process_scheduled_tasks()
log.check_present((LOG_PATH, "ERROR", expected_messages[0]),)

View File

@@ -5,17 +5,21 @@ import datetime
import hashlib
import json
from unittest.mock import patch
from testfixtures import LogCapture
from uuid import uuid4
from testfixtures import LogCapture
from celery.states import FAILURE
from common.djangoapps.student.tests.factories import UserFactory
from lms.djangoapps.bulk_email.api import create_course_email
from lms.djangoapps.bulk_email.data import BulkEmailTargetChoices
from lms.djangoapps.instructor_task.api_helper import QueueConnectionError, schedule_task
from lms.djangoapps.instructor_task.api_helper import QueueConnectionError, schedule_task, submit_scheduled_task
from lms.djangoapps.instructor_task.models import SCHEDULED, InstructorTask, InstructorTaskSchedule
from lms.djangoapps.instructor_task.tests.factories import InstructorTaskFactory, InstructorTaskScheduleFactory
from lms.djangoapps.instructor_task.tests.test_base import InstructorTaskCourseTestCase
LOG_PATH = "lms.djangoapps.instructor_task.api_helper"
class ScheduledBulkEmailInstructorTaskTests(InstructorTaskCourseTestCase):
"""
@@ -132,3 +136,99 @@ class ScheduledBulkEmailInstructorTaskTests(InstructorTaskCourseTestCase):
task = InstructorTask.objects.get(course_id=self.course.id, task_key=self.task_key)
assert task.task_state == FAILURE
self._verify_log_messages(expected_messages, log)
class ScheduledInstructorTaskSubmissionTests(InstructorTaskCourseTestCase):
"""
Unit tests scheduled instructor task functionality. Verifies behavior around retrieving and submission of instructor
tasks due for execution.
"""
def setUp(self):
super().setUp()
self.initialize_course()
self.instructor = UserFactory.create(username="instructor", email="instructor@edx.org")
# create an instructor task instance
task_id = str(uuid4())
self.task = InstructorTaskFactory.create(
task_type="bulk_course_email",
course_id=self.course.id,
task_input="{'email_id': 41, 'to_option': ['myself']}",
task_key="3416a75f4cea9109507cacd8e2f2aefc",
task_id=task_id,
task_state=SCHEDULED,
task_output=None,
requester=self.instructor
)
# associate the task with a instructor task schedule instance
task_args = {
"request_info": {
"username": self.instructor.username,
"user_id": self.instructor.id,
"ip": "192.168.1.100",
"agent": "Mozilla",
"host": "localhost:18000"
},
"task_id": self.task.task_id
}
self.task_schedule = InstructorTaskScheduleFactory.create(
task=self.task,
task_args=json.dumps(task_args),
)
@patch("lms.djangoapps.instructor_task.api_helper.send_bulk_course_email.apply_async")
def test_submit_scheduled_instructor_task(self, mock_task_execution):
"""
A test that verifies the behavior of submitting a scheduled instructor task for execution.
"""
schedule = InstructorTaskSchedule.objects.get(task=self.task.id)
expected_task_arguments = json.loads(schedule.task_args)
expected_task_args = [schedule.task.id, expected_task_arguments]
expected_messages = [
f"Submitting scheduled task {schedule.task.id} for processing",
]
with LogCapture() as log:
submit_scheduled_task(schedule)
mock_task_execution.assert_called_once()
mock_task_execution.assert_called_with(expected_task_args, task_id=schedule.task.task_id)
log.check_present((LOG_PATH, "INFO", expected_messages[0]))
def test_submit_scheduled_instructor_task_bad_task_class(self):
"""
A test that verifies behavior when we can't determine the task class to use when submitting our scheduled task
for execution.
"""
schedule = InstructorTaskSchedule.objects.get(task=self.task.id)
task = schedule.task
task.task_type = "task_without_scheduling_support"
task.save()
expected_messages = [
f"Could not submit scheduled instructor task with id '{schedule.task.id}' and task type "
f"'{task.task_type}'. Could not determine the task class for the request.",
]
with LogCapture() as log:
submit_scheduled_task(schedule)
log.check_present((LOG_PATH, "WARNING", expected_messages[0]))
def test_submit_scheduled_instructor_task_with_error(self):
"""
A test that verifies our task handling if an error occurs during task submission. Verifies that the task failure
is correctly handled.
"""
schedule = InstructorTaskSchedule.objects.get(task=self.task.id)
schedule.task_args = "{malformed JSON data}"
schedule.save()
expected_messages = [
f"Error submitting scheduled task '{schedule.task.id}' to Celery: Expecting property name enclosed in "
"double quotes: line 1 column 2 (char 1)",
]
with self.assertRaises(QueueConnectionError):
with LogCapture() as log:
submit_scheduled_task(schedule)
assert schedule.task.task_state == FAILURE
log.check_present((LOG_PATH, "ERROR", expected_messages[0]))