356 lines
14 KiB
Python
356 lines
14 KiB
Python
"""
|
|
WE'RE USING MIGRATIONS!
|
|
|
|
If you make changes to this model, be sure to create an appropriate migration
|
|
file and check it in at the same time as your model changes. To do that,
|
|
|
|
1. Go to the edx-platform dir
|
|
2. ./manage.py schemamigration instructor_task --auto description_of_your_change
|
|
3. Add the migration file created in edx-platform/lms/djangoapps/instructor_task/migrations/
|
|
|
|
|
|
ASSUMPTIONS: modules have unique IDs, even across different module_types
|
|
|
|
"""
|
|
import csv
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os.path
|
|
from uuid import uuid4
|
|
|
|
from botocore.exceptions import ClientError
|
|
from django.apps import apps
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user
|
|
from django.core.files.base import ContentFile
|
|
from django.db import models, transaction
|
|
|
|
from django.utils.translation import gettext as _
|
|
from model_utils.models import TimeStampedModel
|
|
from opaque_keys.edx.django.models import CourseKeyField
|
|
from simple_history.models import HistoricalRecords
|
|
|
|
from openedx.core.storage import get_storage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# define custom states used by InstructorTask
|
|
QUEUING = 'QUEUING'
|
|
PROGRESS = 'PROGRESS'
|
|
SCHEDULED = 'SCHEDULED'
|
|
TASK_INPUT_LENGTH = 10000
|
|
DJANGO_STORE_STORAGE_CLASS = 'storages.backends.s3boto3.S3Boto3Storage'
|
|
|
|
|
|
class InstructorTask(models.Model):
|
|
"""
|
|
Stores information about background tasks that have been submitted to
|
|
perform work by an instructor (or course staff).
|
|
Examples include grading and rescoring.
|
|
|
|
`task_type` identifies the kind of task being performed, e.g. rescoring.
|
|
`course_id` uses the course run's unique id to identify the course.
|
|
`task_key` stores relevant input arguments encoded into key value for testing to see
|
|
if the task is already running (together with task_type and course_id).
|
|
`task_input` stores input arguments as JSON-serialized dict, for reporting purposes.
|
|
Examples include url of problem being rescored, id of student if only one student being rescored.
|
|
|
|
`task_id` stores the id used by celery for the background task.
|
|
`task_state` stores the last known state of the celery task
|
|
`task_output` stores the output of the celery task.
|
|
Format is a JSON-serialized dict. Content varies by task_type and task_state.
|
|
|
|
`requester` stores id of user who submitted the task
|
|
`created` stores date that entry was first created
|
|
`updated` stores date that entry was last modified
|
|
|
|
.. no_pii:
|
|
"""
|
|
class Meta:
|
|
app_label = "instructor_task"
|
|
|
|
task_type = models.CharField(max_length=50, db_index=True)
|
|
course_id = CourseKeyField(max_length=255, db_index=True)
|
|
task_key = models.CharField(max_length=255, db_index=True)
|
|
task_input = models.TextField()
|
|
task_id = models.CharField(max_length=255, db_index=True) # max_length from celery_taskmeta
|
|
task_state = models.CharField(max_length=50, null=True, db_index=True) # max_length from celery_taskmeta
|
|
task_output = models.CharField(max_length=1024, null=True)
|
|
requester = models.ForeignKey(User, db_index=True, on_delete=models.CASCADE)
|
|
created = models.DateTimeField(auto_now_add=True, null=True)
|
|
updated = models.DateTimeField(auto_now=True)
|
|
subtasks = models.TextField(blank=True) # JSON dictionary
|
|
|
|
def __repr__(self):
|
|
return 'InstructorTask<{!r}>'.format({
|
|
'task_type': self.task_type,
|
|
'course_id': self.course_id,
|
|
'task_input': self.task_input,
|
|
'task_id': self.task_id,
|
|
'task_state': self.task_state,
|
|
'task_output': self.task_output,
|
|
})
|
|
|
|
def __str__(self):
|
|
return str(repr(self))
|
|
|
|
@classmethod
|
|
def create(cls, course_id, task_type, task_key, task_input, requester):
|
|
"""
|
|
Create an instance of InstructorTask.
|
|
"""
|
|
# create the task_id here, and pass it into celery:
|
|
task_id = str(uuid4())
|
|
json_task_input = json.dumps(task_input)
|
|
|
|
# check length of task_input, and return an exception if it's too long
|
|
if len(json_task_input) > TASK_INPUT_LENGTH:
|
|
logger.error(
|
|
'Task input longer than: `%s` for `%s` of course: `%s`',
|
|
TASK_INPUT_LENGTH,
|
|
task_type,
|
|
course_id
|
|
)
|
|
error_msg = _('An error has occurred. Task was not created.')
|
|
raise AttributeError(error_msg)
|
|
|
|
# create the task, then save it:
|
|
instructor_task = cls(
|
|
course_id=course_id,
|
|
task_type=task_type,
|
|
task_id=task_id,
|
|
task_key=task_key,
|
|
task_input=json_task_input,
|
|
task_state=QUEUING,
|
|
requester=requester
|
|
)
|
|
instructor_task.save_now()
|
|
|
|
return instructor_task
|
|
|
|
@transaction.atomic
|
|
def save_now(self):
|
|
"""
|
|
Writes InstructorTask immediately, ensuring the transaction is committed.
|
|
"""
|
|
self.save()
|
|
|
|
@staticmethod
|
|
def create_output_for_success(returned_result):
|
|
"""
|
|
Converts successful result to output format.
|
|
|
|
Raises a ValueError exception if the output is too long.
|
|
"""
|
|
# In future, there should be a check here that the resulting JSON
|
|
# will fit in the column. In the meantime, just return an exception.
|
|
json_output = json.dumps(returned_result)
|
|
if len(json_output) > 1023:
|
|
raise ValueError(f"Length of task output is too long: {json_output}")
|
|
return json_output
|
|
|
|
@staticmethod
|
|
def create_output_for_failure(exception, traceback_string):
|
|
"""
|
|
Converts failed result information to output format.
|
|
|
|
Traceback information is truncated or not included if it would result in an output string
|
|
that would not fit in the database. If the output is still too long, then the
|
|
exception message is also truncated.
|
|
|
|
Truncation is indicated by adding "..." to the end of the value.
|
|
"""
|
|
tag = '...'
|
|
task_progress = {'exception': type(exception).__name__, 'message': str(exception)}
|
|
if traceback_string is not None:
|
|
# truncate any traceback that goes into the InstructorTask model:
|
|
task_progress['traceback'] = traceback_string
|
|
json_output = json.dumps(task_progress)
|
|
# if the resulting output is too long, then first shorten the
|
|
# traceback, and then the message, until it fits.
|
|
too_long = len(json_output) - 1023
|
|
if too_long > 0:
|
|
if traceback_string is not None:
|
|
if too_long >= len(traceback_string) - len(tag):
|
|
# remove the traceback entry entirely (so no key or value)
|
|
del task_progress['traceback']
|
|
too_long -= (len(traceback_string) + len('traceback'))
|
|
else:
|
|
# truncate the traceback:
|
|
task_progress['traceback'] = traceback_string[(too_long + len(tag)):] + tag
|
|
too_long = 0
|
|
if too_long > 0:
|
|
# we need to shorten the message:
|
|
task_progress['message'] = task_progress['message'][(too_long + len(tag)):] + tag
|
|
json_output = json.dumps(task_progress)
|
|
return json_output
|
|
|
|
@staticmethod
|
|
def create_output_for_revoked():
|
|
"""Creates standard message to store in output format for revoked tasks."""
|
|
return json.dumps({'message': 'Task revoked before running'})
|
|
|
|
|
|
class InstructorTaskSchedule(TimeStampedModel):
|
|
"""
|
|
A database model to store information about _when_ to execute a scheduled background task.
|
|
|
|
The primary use case is to allow instructors to schedule their email messages (authored with the bulk course email
|
|
tool) to be sent at a later date and time.
|
|
|
|
.. no_pii:
|
|
"""
|
|
class Meta:
|
|
app_label = "instructor_task"
|
|
|
|
task = models.OneToOneField(InstructorTask, on_delete=models.DO_NOTHING)
|
|
task_args = models.TextField(null=False, blank=False)
|
|
task_due = models.DateTimeField(null=False)
|
|
|
|
if 'instructor_task' in apps.app_configs:
|
|
history = HistoricalRecords()
|
|
|
|
|
|
class ReportStore:
|
|
"""
|
|
Simple abstraction layer that can fetch and store CSV files for reports
|
|
download. Should probably refactor later to create a ReportFile object that
|
|
can simply be appended to for the sake of memory efficiency, rather than
|
|
passing in the whole dataset. Doing that for now just because it's simpler.
|
|
"""
|
|
@classmethod
|
|
def from_config(cls, config_name):
|
|
"""
|
|
Return one of the ReportStore subclasses depending on django
|
|
configuration. Look at subclasses for expected configuration.
|
|
"""
|
|
# Convert old configuration parameters to those expected by
|
|
# DjangoStorageReportStore for backward compatibility
|
|
config = getattr(settings, config_name, {})
|
|
storage_type = config.get('STORAGE_TYPE', '').lower()
|
|
if storage_type == 's3':
|
|
return DjangoStorageReportStore(
|
|
storage_class=DJANGO_STORE_STORAGE_CLASS,
|
|
storage_kwargs={
|
|
'bucket_name': config['BUCKET'],
|
|
'location': config['ROOT_PATH'],
|
|
'custom_domain': config.get("CUSTOM_DOMAIN", None),
|
|
'querystring_expire': 300,
|
|
'gzip': True,
|
|
},
|
|
)
|
|
elif storage_type == 'localfs':
|
|
return DjangoStorageReportStore(
|
|
storage_class='django.core.files.storage.FileSystemStorage',
|
|
storage_kwargs={
|
|
'location': config['ROOT_PATH'],
|
|
},
|
|
)
|
|
return DjangoStorageReportStore.from_config(config_name)
|
|
|
|
def _get_utf8_encoded_rows(self, rows):
|
|
"""
|
|
Given a list of `rows` containing unicode strings, return a
|
|
new list of rows with those strings encoded as utf-8 for CSV
|
|
compatibility.
|
|
"""
|
|
for row in rows:
|
|
yield [str(item) for item in row]
|
|
|
|
|
|
class DjangoStorageReportStore(ReportStore):
|
|
"""
|
|
ReportStore implementation that delegates to django's storage api.
|
|
"""
|
|
def __init__(self, storage_class=None, storage_kwargs=None):
|
|
if storage_kwargs is None:
|
|
storage_kwargs = {}
|
|
|
|
self.storage = get_storage(storage_class, **storage_kwargs)
|
|
|
|
@classmethod
|
|
def from_config(cls, config_name):
|
|
"""
|
|
By default, the default file storage specified by the `STORAGES['default']`
|
|
setting will be used. To configure the storage used, add a dict in
|
|
settings with the following fields::
|
|
|
|
STORAGE_CLASS : The import path of the storage class to use. If
|
|
not set, the STORAGES['default']['BACKEND'] setting will be used.
|
|
STORAGE_KWARGS : An optional dict of kwargs to pass to the storage
|
|
constructor. This can be used to specify a
|
|
different S3 bucket or root path, for example.
|
|
|
|
Reference the setting name when calling `.from_config`.
|
|
"""
|
|
return cls(
|
|
getattr(settings, config_name).get('STORAGE_CLASS'),
|
|
getattr(settings, config_name).get('STORAGE_KWARGS'),
|
|
)
|
|
|
|
def store(self, course_id, filename, buff, parent_dir=''):
|
|
"""
|
|
Store the contents of `buff` in a directory determined by hashing
|
|
`course_id`, and name the file `filename`. `buff` can be any file-like
|
|
object, ready to be read from the beginning.
|
|
"""
|
|
path = self.path_to(course_id, filename, parent_dir)
|
|
# See https://github.com/boto/boto/issues/2868
|
|
# Boto doesn't play nice with unicode in python3
|
|
buff_contents = buff.read()
|
|
|
|
if not isinstance(buff_contents, bytes):
|
|
buff_contents = buff_contents.encode('utf-8')
|
|
|
|
buff = ContentFile(buff_contents)
|
|
|
|
self.storage.save(path, buff)
|
|
|
|
def store_rows(self, course_id, filename, rows, parent_dir=''):
|
|
"""
|
|
Given a course_id, filename, and rows (each row is an iterable of
|
|
strings), write the rows to the storage backend in csv format.
|
|
"""
|
|
output_buffer = ContentFile('')
|
|
csvwriter = csv.writer(output_buffer)
|
|
csvwriter.writerows(self._get_utf8_encoded_rows(rows))
|
|
output_buffer.seek(0)
|
|
self.store(course_id, filename, output_buffer, parent_dir)
|
|
|
|
def links_for(self, course_id):
|
|
"""
|
|
For a given `course_id`, return a list of `(filename, url)` tuples.
|
|
Calls the `url` method of the underlying storage backend. Returned
|
|
urls can be plugged straight into an href
|
|
"""
|
|
course_dir = self.path_to(course_id)
|
|
try:
|
|
_, filenames = self.storage.listdir(course_dir)
|
|
except OSError:
|
|
# Django's FileSystemStorage fails with an OSError if the course
|
|
# dir does not exist; other storage types return an empty list.
|
|
return []
|
|
except ClientError as ex:
|
|
logger.error(
|
|
'Fetching files failed for course: %s, status: %s, reason: %s',
|
|
course_id,
|
|
ex.response.get('Error'), ex.response.get('Error', {}).get('Message')
|
|
)
|
|
return []
|
|
|
|
files = [(filename, os.path.join(course_dir, filename)) for filename in filenames]
|
|
files.sort(key=lambda f: self.storage.get_modified_time(f[1]), reverse=True)
|
|
return [
|
|
(filename, self.storage.url(full_path))
|
|
for filename, full_path in files
|
|
]
|
|
|
|
def path_to(self, course_id, filename='', parent_dir=''):
|
|
"""
|
|
Return the full path to a given file for a given course.
|
|
"""
|
|
hashed_course_id = hashlib.sha1(str(course_id).encode('utf-8')).hexdigest()
|
|
directory = parent_dir if bool(parent_dir) else hashed_course_id
|
|
return os.path.join(directory, filename)
|