Ran pyupgrade on lms/djangoapps/instructor_analytics Ran pyugprade on lms/djangoapps/instructor_task Ran pyupgrade on lms/djangoapps/learner_dashboard
100 lines
4.6 KiB
Python
100 lines
4.6 KiB
Python
"""
|
|
Base class for Instructor celery tasks.
|
|
"""
|
|
|
|
|
|
import logging
|
|
|
|
from celery import Task
|
|
from celery.states import FAILURE, SUCCESS
|
|
|
|
from lms.djangoapps.instructor_task.models import InstructorTask
|
|
|
|
# define different loggers for use within tasks and on client side
|
|
TASK_LOG = logging.getLogger('edx.celery.task')
|
|
|
|
|
|
class BaseInstructorTask(Task): # lint-amnesty, pylint: disable=abstract-method
|
|
"""
|
|
Base task class for use with InstructorTask models.
|
|
|
|
Permits updating information about task in corresponding InstructorTask for monitoring purposes.
|
|
|
|
Assumes that the entry_id of the InstructorTask model is the first argument to the task.
|
|
|
|
The `entry_id` is the primary key for the InstructorTask entry representing the task. This class
|
|
updates the entry on success and failure of the task it wraps. It is setting the entry's value
|
|
for task_state based on what Celery would set it to once the task returns to Celery:
|
|
FAILURE if an exception is encountered, and SUCCESS if it returns normally.
|
|
Other arguments are pass-throughs to perform_module_state_update, and documented there.
|
|
"""
|
|
abstract = True
|
|
|
|
def on_success(self, task_progress, task_id, args, kwargs): # lint-amnesty, pylint: disable=arguments-differ
|
|
"""
|
|
Update InstructorTask object corresponding to this task with info about success.
|
|
|
|
Updates task_output and task_state. But it shouldn't actually do anything
|
|
if the task is only creating subtasks to actually do the work.
|
|
|
|
Assumes `task_progress` is a dict containing the task's result, with the following keys:
|
|
|
|
'attempted': number of attempts made
|
|
'succeeded': number of attempts that "succeeded"
|
|
'skipped': number of attempts that "skipped"
|
|
'failed': number of attempts that "failed"
|
|
'total': number of possible subtasks to attempt
|
|
'action_name': user-visible verb to use in status messages. Should be past-tense.
|
|
Pass-through of input `action_name`.
|
|
'duration_ms': how long the task has (or had) been running.
|
|
|
|
This is JSON-serialized and stored in the task_output column of the InstructorTask entry.
|
|
|
|
"""
|
|
TASK_LOG.debug('Task %s: success returned with progress: %s', task_id, task_progress)
|
|
# We should be able to find the InstructorTask object to update
|
|
# based on the task_id here, without having to dig into the
|
|
# original args to the task. On the other hand, the entry_id
|
|
# is the first value passed to all such args, so we'll use that.
|
|
# And we assume that it exists, else we would already have had a failure.
|
|
entry_id = args[0]
|
|
entry = InstructorTask.objects.get(pk=entry_id)
|
|
# Check to see if any subtasks had been defined as part of this task.
|
|
# If not, then we know that we're done. (If so, let the subtasks
|
|
# handle updating task_state themselves.)
|
|
if len(entry.subtasks) == 0:
|
|
entry.task_output = InstructorTask.create_output_for_success(task_progress)
|
|
entry.task_state = SUCCESS
|
|
entry.save_now()
|
|
|
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
|
"""
|
|
Update InstructorTask object corresponding to this task with info about failure.
|
|
|
|
Fetches and updates exception and traceback information on failure.
|
|
|
|
If an exception is raised internal to the task, it is caught by celery and provided here.
|
|
The information is recorded in the InstructorTask object as a JSON-serialized dict
|
|
stored in the task_output column. It contains the following keys:
|
|
|
|
'exception': type of exception object
|
|
'message': error message from exception object
|
|
'traceback': traceback information (truncated if necessary)
|
|
|
|
Note that there is no way to record progress made within the task (e.g. attempted,
|
|
succeeded, etc.) when such failures occur.
|
|
"""
|
|
TASK_LOG.debug('Task %s: failure returned', task_id)
|
|
entry_id = args[0]
|
|
try:
|
|
entry = InstructorTask.objects.get(pk=entry_id)
|
|
except InstructorTask.DoesNotExist:
|
|
# if the InstructorTask object does not exist, then there's no point
|
|
# trying to update it.
|
|
TASK_LOG.error("Task (%s) has no InstructorTask object for id %s", task_id, entry_id)
|
|
else:
|
|
TASK_LOG.warning("Task (%s) failed", task_id, exc_info=True)
|
|
entry.task_output = InstructorTask.create_output_for_failure(einfo.exception, einfo.traceback)
|
|
entry.task_state = FAILURE
|
|
entry.save_now()
|