Merge pull request #30448 from openedx/jhynes/microba-1512_mgmt_cmd
feat: Add management command for processing scheduled instructor tasks
This commit is contained in:
@@ -575,13 +575,14 @@ def generate_anonymous_ids(request, course_key):
|
||||
return submit_task(request, task_type, task_class, course_key, task_input, task_key)
|
||||
|
||||
|
||||
def process_scheduled_tasks():
|
||||
def process_scheduled_instructor_tasks():
|
||||
"""
|
||||
Utility function that retrieves tasks whose schedules have elapsed and should be processed. Only retrieves
|
||||
instructor tasks that are in the `SCHEDULED` state. Then submits these tasks for processing by Celery.
|
||||
"""
|
||||
now = datetime.datetime.now(pytz.utc)
|
||||
due_schedules = InstructorTaskSchedule.objects.filter(task__task_state=SCHEDULED).filter(task_due__lte=now)
|
||||
log.info(f"Retrieved {due_schedules.count()} scheduled instructor tasks due for execution")
|
||||
for schedule in due_schedules:
|
||||
try:
|
||||
log.info(f"Attempting to queue scheduled task with id '{schedule.task.id}'")
|
||||
|
||||
@@ -526,7 +526,7 @@ def submit_scheduled_task(schedule):
|
||||
# turn this into the format Celery expects
|
||||
task_args = [schedule.task.id, task_arguments]
|
||||
# submit the task
|
||||
log.info(f"Submitting scheduled task {schedule.task.id} for processing")
|
||||
log.info(f"Submitting scheduled task '{schedule.task.id}' for processing")
|
||||
task_class.apply_async(task_args, task_id=schedule.task.task_id)
|
||||
except Exception as error: # pylint: disable=broad-except
|
||||
# broad except here to make sure we cast a wide net for tasks with issues that can't be processed
|
||||
|
||||
@@ -0,0 +1,14 @@
|
||||
"""
|
||||
Command to process scheduled instructor tasks.
|
||||
"""
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from lms.djangoapps.instructor_task.api import process_scheduled_instructor_tasks
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
"""
|
||||
Command to process Instructor Tasks in the `SCHEDULED` state that are due for execution.
|
||||
"""
|
||||
def handle(self, *args, **options):
|
||||
process_scheduled_instructor_tasks()
|
||||
@@ -28,7 +28,7 @@ from lms.djangoapps.instructor_task.api import (
|
||||
generate_certificates_for_students,
|
||||
get_instructor_task_history,
|
||||
get_running_instructor_tasks,
|
||||
process_scheduled_tasks,
|
||||
process_scheduled_instructor_tasks,
|
||||
regenerate_certificates,
|
||||
submit_bulk_course_email,
|
||||
submit_calculate_may_enroll_csv,
|
||||
@@ -508,7 +508,7 @@ class InstructorTaskCourseSubmitTest(TestReportMixin, InstructorTaskCourseTestCa
|
||||
]
|
||||
|
||||
with LogCapture() as log:
|
||||
process_scheduled_tasks()
|
||||
process_scheduled_instructor_tasks()
|
||||
|
||||
mock_submit_scheduled_task.assert_called_once_with(due_instructor_task_schedule)
|
||||
log.check_present((LOG_PATH, "INFO", expected_messages[0]),)
|
||||
@@ -528,7 +528,7 @@ class InstructorTaskCourseSubmitTest(TestReportMixin, InstructorTaskCourseTestCa
|
||||
]
|
||||
|
||||
with LogCapture() as log:
|
||||
process_scheduled_tasks()
|
||||
process_scheduled_instructor_tasks()
|
||||
|
||||
log.check_present((LOG_PATH, "ERROR", expected_messages[0]),)
|
||||
|
||||
|
||||
@@ -185,7 +185,7 @@ class ScheduledInstructorTaskSubmissionTests(InstructorTaskCourseTestCase):
|
||||
expected_task_arguments = json.loads(schedule.task_args)
|
||||
expected_task_args = [schedule.task.id, expected_task_arguments]
|
||||
expected_messages = [
|
||||
f"Submitting scheduled task {schedule.task.id} for processing",
|
||||
f"Submitting scheduled task '{schedule.task.id}' for processing",
|
||||
]
|
||||
|
||||
with LogCapture() as log:
|
||||
|
||||
Reference in New Issue
Block a user