Add support for counting and reporting skips in background tasks.
This commit is contained in:
@@ -12,8 +12,9 @@ file and check it in at the same time as your model changes. To do that,
|
||||
|
||||
"""
|
||||
import logging
|
||||
from django.db import models
|
||||
from django.db import models, transaction
|
||||
from django.contrib.auth.models import User
|
||||
from html_to_text import html_to_text
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -33,9 +34,11 @@ class Email(models.Model):
|
||||
class Meta: # pylint: disable=C0111
|
||||
abstract = True
|
||||
|
||||
|
||||
SEND_TO_MYSELF = 'myself'
|
||||
SEND_TO_STAFF = 'staff'
|
||||
SEND_TO_ALL = 'all'
|
||||
TO_OPTIONS = [SEND_TO_MYSELF, SEND_TO_STAFF, SEND_TO_ALL]
|
||||
|
||||
|
||||
class CourseEmail(Email, models.Model):
|
||||
@@ -51,17 +54,66 @@ class CourseEmail(Email, models.Model):
|
||||
# * All: This sends an email to anyone enrolled in the course, with any role
|
||||
# (student, staff, or instructor)
|
||||
#
|
||||
TO_OPTIONS = (
|
||||
TO_OPTION_CHOICES = (
|
||||
(SEND_TO_MYSELF, 'Myself'),
|
||||
(SEND_TO_STAFF, 'Staff and instructors'),
|
||||
(SEND_TO_ALL, 'All')
|
||||
)
|
||||
course_id = models.CharField(max_length=255, db_index=True)
|
||||
to_option = models.CharField(max_length=64, choices=TO_OPTIONS, default=SEND_TO_MYSELF)
|
||||
to_option = models.CharField(max_length=64, choices=TO_OPTION_CHOICES, default=SEND_TO_MYSELF)
|
||||
|
||||
def __unicode__(self):
|
||||
return self.subject
|
||||
|
||||
@classmethod
|
||||
def create(cls, course_id, sender, to_option, subject, html_message, text_message=None):
|
||||
"""
|
||||
Create an instance of CourseEmail.
|
||||
|
||||
The CourseEmail.save_now method makes sure the CourseEmail entry is committed.
|
||||
When called from any view that is wrapped by TransactionMiddleware,
|
||||
and thus in a "commit-on-success" transaction, an autocommit buried within here
|
||||
will cause any pending transaction to be committed by a successful
|
||||
save here. Any future database operations will take place in a
|
||||
separate transaction.
|
||||
"""
|
||||
# automatically generate the stripped version of the text from the HTML markup:
|
||||
if text_message is None:
|
||||
text_message = html_to_text(html_message)
|
||||
|
||||
# perform some validation here:
|
||||
if to_option not in TO_OPTIONS:
|
||||
fmt = 'Course email being sent to unrecognized to_option: "{to_option}" for "{course}", subject "{subject}"'
|
||||
msg = fmt.format(to_option=to_option, course=course_id, subject=subject)
|
||||
raise ValueError(msg)
|
||||
|
||||
# create the task, then save it immediately:
|
||||
course_email = cls(
|
||||
course_id=course_id,
|
||||
sender=sender,
|
||||
to_option=to_option,
|
||||
subject=subject,
|
||||
html_message=html_message,
|
||||
text_message=text_message,
|
||||
)
|
||||
course_email.save_now()
|
||||
|
||||
return course_email
|
||||
|
||||
@transaction.autocommit
|
||||
def save_now(self):
|
||||
"""
|
||||
Writes InstructorTask immediately, ensuring the transaction is committed.
|
||||
|
||||
Autocommit annotation makes sure the database entry is committed.
|
||||
When called from any view that is wrapped by TransactionMiddleware,
|
||||
and thus in a "commit-on-success" transaction, this autocommit here
|
||||
will cause any pending transaction to be committed by a successful
|
||||
save here. Any future database operations will take place in a
|
||||
separate transaction.
|
||||
"""
|
||||
self.save()
|
||||
|
||||
|
||||
class Optout(models.Model):
|
||||
"""
|
||||
|
||||
@@ -7,17 +7,22 @@ import re
|
||||
from uuid import uuid4
|
||||
from time import time, sleep
|
||||
import json
|
||||
from sys import exc_info
|
||||
from traceback import format_exc
|
||||
|
||||
from dogapi import dog_stats_api
|
||||
from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError
|
||||
|
||||
from celery import task, current_task, group
|
||||
from celery.utils.log import get_task_logger
|
||||
from celery.states import SUCCESS, FAILURE
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User, Group
|
||||
from django.core.mail import EmailMultiAlternatives, get_connection
|
||||
from django.http import Http404
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.db import transaction
|
||||
|
||||
from bulk_email.models import (
|
||||
CourseEmail, Optout, CourseEmailTemplate,
|
||||
@@ -99,11 +104,10 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
try:
|
||||
email_obj = CourseEmail.objects.get(id=email_id)
|
||||
except CourseEmail.DoesNotExist as exc:
|
||||
# The retry behavior here is necessary because of a race condition between the commit of the transaction
|
||||
# that creates this CourseEmail row and the celery pipeline that starts this task.
|
||||
# We might possibly want to move the blocking into the view function rather than have it in this task.
|
||||
# log.warning("Failed to get CourseEmail with id %s, retry %d", email_id, current_task.request.retries)
|
||||
# raise delegate_email_batches.retry(arg=[email_id, user_id], exc=exc)
|
||||
# The CourseEmail object should be committed in the view function before the task
|
||||
# is submitted and reaches this point. It is possible to add retry behavior here,
|
||||
# to keep trying until the object is actually committed by the view function's return,
|
||||
# but it's cleaner to just expect to be done.
|
||||
log.warning("Failed to get CourseEmail with id %s", email_id)
|
||||
raise
|
||||
|
||||
@@ -123,13 +127,18 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
recipient_qset = get_recipient_queryset(user_id, to_option, course_id, course.location)
|
||||
total_num_emails = recipient_qset.count()
|
||||
|
||||
log.info("Preparing to queue emails to %d recipient(s) for course %s, email %s, to_option %s",
|
||||
total_num_emails, course_id, email_id, to_option)
|
||||
|
||||
# At this point, we have some status that we can report, as to the magnitude of the overall
|
||||
# task. That is, we know the total. Set that, and our subtasks should work towards that goal.
|
||||
# Note that we add start_time in here, so that it can be used
|
||||
# by subtasks to calculate duration_ms values:
|
||||
progress = {'action_name': action_name,
|
||||
'attempted': 0,
|
||||
'updated': 0,
|
||||
'failed': 0,
|
||||
'skipped': 0,
|
||||
'succeeded': 0,
|
||||
'total': total_num_emails,
|
||||
'duration_ms': int(0),
|
||||
'start_time': time(),
|
||||
@@ -156,6 +165,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
subtask_id = str(uuid4())
|
||||
subtask_id_list.append(subtask_id)
|
||||
task_list.append(send_course_email.subtask((
|
||||
entry_id,
|
||||
email_id,
|
||||
to_list,
|
||||
global_email_context,
|
||||
@@ -166,46 +176,95 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
|
||||
# Before we actually start running the tasks we've defined,
|
||||
# the InstructorTask needs to be updated with their information.
|
||||
# So at this point, we need to update the InstructorTask object here,
|
||||
# not in the return.
|
||||
# So we update the InstructorTask object here, not in the return.
|
||||
# The monitoring code knows that it shouldn't go to the InstructorTask's task's
|
||||
# Result for its progress when there are subtasks. So we accumulate
|
||||
# the results of each subtask as it completes into the InstructorTask.
|
||||
entry.task_output = InstructorTask.create_output_for_success(progress)
|
||||
|
||||
# TODO: the monitoring may need to track a different value here to know
|
||||
# that it shouldn't go to the InstructorTask's task's Result for its
|
||||
# progress. It might be that this is getting saved.
|
||||
# It might be enough, on the other hand, for the monitoring code to see
|
||||
# that there are subtasks, and that it can scan these for the overall
|
||||
# status. (And that it shouldn't clobber the progress that is being
|
||||
# accumulated.) If there are no subtasks, then work as is current.
|
||||
entry.task_state = PROGRESS
|
||||
|
||||
# now write out the subtasks information.
|
||||
num_subtasks = len(subtask_id_list)
|
||||
subtask_status = dict.fromkeys(subtask_id_list, QUEUING)
|
||||
entry.subtasks = json.dumps(subtask_status)
|
||||
subtask_dict = {'total': num_subtasks, 'succeeded': 0, 'failed': 0, 'status': subtask_status}
|
||||
entry.subtasks = json.dumps(subtask_dict)
|
||||
|
||||
# and save the entry immediately, before any subtasks actually start work:
|
||||
entry.save_now()
|
||||
|
||||
log.info("Preparing to queue %d email tasks for course %s, email %s, to %s",
|
||||
num_subtasks, course_id, email_id, to_option)
|
||||
|
||||
# now group the subtasks, and start them running:
|
||||
task_group = group(task_list)
|
||||
task_group_result = task_group.apply_async()
|
||||
task_group.apply_async()
|
||||
|
||||
# ISSUE: we can return this result now, but it's not really the result for this task.
|
||||
# So if we use the task_id to fetch a task result, we won't get this one. But it
|
||||
# might still work. The caller just has to hold onto this, and access it in some way.
|
||||
# Ugh. That seems unlikely...
|
||||
# return task_group_result
|
||||
|
||||
# Still want to return progress here, as this is what will be stored in the
|
||||
# We want to return progress here, as this is what will be stored in the
|
||||
# AsyncResult for the parent task as its return value.
|
||||
# TODO: Humph. But it will be marked as SUCCEEDED. And have
|
||||
# this return value as it's "result". So be it. The InstructorTask
|
||||
# will not match, because it will have different info.
|
||||
# The Result will then be marked as SUCCEEDED, and have this return value as it's "result".
|
||||
# That's okay, for the InstructorTask will have the "real" status.
|
||||
return progress
|
||||
|
||||
|
||||
def _get_current_task():
|
||||
"""Stub to make it easier to test without actually running Celery"""
|
||||
return current_task
|
||||
|
||||
|
||||
@transaction.commit_manually
|
||||
def _update_subtask_status(entry_id, current_task_id, status, subtask_result):
|
||||
"""
|
||||
Update the status of the subtask in the parent InstructorTask object tracking its progress.
|
||||
"""
|
||||
log.info("Preparing to update status for email subtask %s for instructor task %d with status %s",
|
||||
current_task_id, entry_id, subtask_result)
|
||||
|
||||
try:
|
||||
entry = InstructorTask.objects.select_for_update().get(pk=entry_id)
|
||||
subtask_dict = json.loads(entry.subtasks)
|
||||
subtask_status = subtask_dict['status']
|
||||
if current_task_id not in subtask_status:
|
||||
# unexpected error -- raise an exception?
|
||||
log.warning("Unexpected task_id '%s': unable to update status for email subtask of instructor task %d",
|
||||
current_task_id, entry_id)
|
||||
pass
|
||||
subtask_status[current_task_id] = status
|
||||
# now update the parent task progress
|
||||
task_progress = json.loads(entry.task_output)
|
||||
start_time = task_progress['start_time']
|
||||
task_progress['duration_ms'] = int((time() - start_time) * 1000)
|
||||
if subtask_result is not None:
|
||||
for statname in ['attempted', 'succeeded', 'failed', 'skipped']:
|
||||
task_progress[statname] += subtask_result[statname]
|
||||
# now figure out if we're actually done (i.e. this is the last task to complete)
|
||||
# (This might be easier by just maintaining a counter, rather than scanning the
|
||||
# entire subtask_status dict.)
|
||||
if status == SUCCESS:
|
||||
subtask_dict['succeeded'] += 1
|
||||
else:
|
||||
subtask_dict['failed'] += 1
|
||||
num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed']
|
||||
if num_remaining <= 0:
|
||||
# we're done with the last task: update the parent status to indicate that:
|
||||
entry.task_state = SUCCESS
|
||||
entry.subtasks = json.dumps(subtask_dict)
|
||||
entry.task_output = InstructorTask.create_output_for_success(task_progress)
|
||||
|
||||
log.info("Task output updated to %s for email subtask %s of instructor task %d",
|
||||
entry.task_output, current_task_id, entry_id)
|
||||
|
||||
log.info("about to save....")
|
||||
entry.save()
|
||||
except:
|
||||
log.exception("Unexpected error while updating InstructorTask.")
|
||||
transaction.rollback()
|
||||
else:
|
||||
log.info("about to commit....")
|
||||
transaction.commit()
|
||||
|
||||
|
||||
@task(default_retry_delay=15, max_retries=5) # pylint: disable=E1102
|
||||
def send_course_email(email_id, to_list, global_email_context, throttle=False):
|
||||
def send_course_email(entry_id, email_id, to_list, global_email_context, throttle=False):
|
||||
"""
|
||||
Takes a primary id for a CourseEmail object and a 'to_list' of recipient objects--keys are
|
||||
'profile__name', 'email' (address), and 'pk' (in the user table).
|
||||
@@ -214,9 +273,31 @@ def send_course_email(email_id, to_list, global_email_context, throttle=False):
|
||||
Sends to all addresses contained in to_list. Emails are sent multi-part, in both plain
|
||||
text and html.
|
||||
"""
|
||||
course_title = global_email_context['course_title']
|
||||
with dog_stats_api.timer('course_email.single_task.time.overall', tags=[_statsd_tag(course_title)]):
|
||||
_send_course_email(email_id, to_list, global_email_context, throttle)
|
||||
# Get entry here, as a sanity check that it actually exists. We won't actually do anything
|
||||
# with it right away.
|
||||
InstructorTask.objects.get(pk=entry_id)
|
||||
current_task_id = _get_current_task().request.id
|
||||
|
||||
log.info("Preparing to send email as subtask %s for instructor task %d",
|
||||
current_task_id, entry_id)
|
||||
|
||||
try:
|
||||
course_title = global_email_context['course_title']
|
||||
with dog_stats_api.timer('course_email.single_task.time.overall', tags=[_statsd_tag(course_title)]):
|
||||
course_email_result = _send_course_email(email_id, to_list, global_email_context, throttle)
|
||||
# Assume that if we get here without a raise, the task was successful.
|
||||
# Update the InstructorTask object that is storing its progress.
|
||||
_update_subtask_status(entry_id, current_task_id, SUCCESS, course_email_result)
|
||||
|
||||
except Exception:
|
||||
# try to write out the failure to the entry before failing
|
||||
_, exception, traceback = exc_info()
|
||||
traceback_string = format_exc(traceback) if traceback is not None else ''
|
||||
log.warning("background task (%s) failed: %s %s", current_task_id, exception, traceback_string)
|
||||
_update_subtask_status(entry_id, current_task_id, FAILURE, None)
|
||||
raise
|
||||
|
||||
return course_email_result
|
||||
|
||||
|
||||
def _send_course_email(email_id, to_list, global_email_context, throttle):
|
||||
@@ -284,6 +365,8 @@ def _send_course_email(email_id, to_list, global_email_context, throttle):
|
||||
sleep(0.2)
|
||||
|
||||
try:
|
||||
log.info('Email with id %s to be sent to %s', email_id, email)
|
||||
|
||||
with dog_stats_api.timer('course_email.single_send.time.overall', tags=[_statsd_tag(course_title)]):
|
||||
connection.send_messages([email_msg])
|
||||
|
||||
@@ -307,6 +390,8 @@ def _send_course_email(email_id, to_list, global_email_context, throttle):
|
||||
to_list.pop()
|
||||
|
||||
connection.close()
|
||||
# TODO: figure out how to get (or persist) real statistics for this task, so that reflects progress
|
||||
# made over multiple retries.
|
||||
return course_email_result(num_sent, num_error, num_optout)
|
||||
|
||||
except (SMTPDataError, SMTPConnectError, SMTPServerDisconnected) as exc:
|
||||
@@ -333,10 +418,10 @@ def _send_course_email(email_id, to_list, global_email_context, throttle):
|
||||
raise
|
||||
|
||||
|
||||
# This string format code is wrapped in this function to allow mocking for a unit test
|
||||
def course_email_result(num_sent, num_error, num_optout):
|
||||
"""Return the formatted result of course_email sending."""
|
||||
return "Sent {0}, Fail {1}, Optout {2}".format(num_sent, num_error, num_optout)
|
||||
"""Return the result of course_email sending as a dict (not a string)."""
|
||||
attempted = num_sent + num_error
|
||||
return {'attempted': attempted, 'succeeded': num_sent, 'skipped': num_optout, 'failed': num_error}
|
||||
|
||||
|
||||
def _statsd_tag(course_title):
|
||||
|
||||
@@ -12,6 +12,8 @@ from courseware.tests.tests import TEST_DATA_MONGO_MODULESTORE
|
||||
from student.tests.factories import UserFactory, GroupFactory, CourseEnrollmentFactory
|
||||
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
|
||||
from xmodule.modulestore.tests.factories import CourseFactory
|
||||
from instructor_task.models import InstructorTask
|
||||
from instructor_task.tests.factories import InstructorTaskFactory
|
||||
|
||||
from bulk_email.tasks import send_course_email
|
||||
from bulk_email.models import CourseEmail, Optout
|
||||
@@ -288,10 +290,18 @@ class TestEmailSendExceptions(ModuleStoreTestCase):
|
||||
"""
|
||||
Test that exceptions are handled correctly.
|
||||
"""
|
||||
def test_no_instructor_task(self):
|
||||
with self.assertRaises(InstructorTask.DoesNotExist):
|
||||
send_course_email(100, 101, [], {}, False)
|
||||
|
||||
def test_no_course_title(self):
|
||||
entry = InstructorTaskFactory.create(task_key='', task_id='dummy')
|
||||
with self.assertRaises(KeyError):
|
||||
send_course_email(entry.id, 101, [], {}, False)
|
||||
|
||||
def test_no_course_email_obj(self):
|
||||
# Make sure send_course_email handles CourseEmail.DoesNotExist exception.
|
||||
with self.assertRaises(KeyError):
|
||||
send_course_email(101, [], {}, False)
|
||||
|
||||
entry = InstructorTaskFactory.create(task_key='', task_id='dummy')
|
||||
with self.assertRaises(CourseEmail.DoesNotExist):
|
||||
send_course_email(101, [], {'course_title': 'Test'}, False)
|
||||
send_course_email(entry.id, 101, [], {'course_title': 'Test'}, False)
|
||||
|
||||
|
||||
@@ -30,6 +30,7 @@ from xmodule.modulestore.django import modulestore
|
||||
from xmodule.modulestore.exceptions import ItemNotFoundError
|
||||
from xmodule.html_module import HtmlDescriptor
|
||||
|
||||
from bulk_email.models import CourseEmail
|
||||
from courseware import grades
|
||||
from courseware.access import (has_access, get_access_group_name,
|
||||
course_beta_test_group_name)
|
||||
@@ -718,7 +719,6 @@ def instructor_dashboard(request, course_id):
|
||||
email_to_option = request.POST.get("to_option")
|
||||
email_subject = request.POST.get("subject")
|
||||
html_message = request.POST.get("message")
|
||||
text_message = html_to_text(html_message)
|
||||
|
||||
# TODO: make sure this is committed before submitting it to the task.
|
||||
# However, it should probably be enough to do the submit below, which
|
||||
@@ -727,15 +727,7 @@ def instructor_dashboard(request, course_id):
|
||||
# Actually, this should probably be moved out, so that all the validation logic
|
||||
# we might want to add to it can be added. There might also be something
|
||||
# that would permit validation of the email beforehand.
|
||||
email = CourseEmail(
|
||||
course_id=course_id,
|
||||
sender=request.user,
|
||||
to_option=email_to_option,
|
||||
subject=email_subject,
|
||||
html_message=html_message,
|
||||
text_message=text_message
|
||||
)
|
||||
email.save()
|
||||
email = CourseEmail.create(course_id, request.user, email_to_option, email_subject, html_message)
|
||||
|
||||
# TODO: make this into a task submission, so that the correct
|
||||
# InstructorTask object gets created (for monitoring purposes)
|
||||
@@ -746,6 +738,10 @@ def instructor_dashboard(request, course_id):
|
||||
else:
|
||||
email_msg = '<div class="msg msg-confirm"><p class="copy">Your email was successfully queued for sending.</p></div>'
|
||||
|
||||
elif "Show Background Email Task History" in action:
|
||||
message, datatable = get_background_task_table(course_id, task_type='bulk_course_email')
|
||||
msg += message
|
||||
|
||||
#----------------------------------------
|
||||
# psychometrics
|
||||
|
||||
@@ -870,6 +866,7 @@ def instructor_dashboard(request, course_id):
|
||||
|
||||
return render_to_response('courseware/instructor_dashboard.html', context)
|
||||
|
||||
|
||||
def _do_remote_gradebook(user, course, action, args=None, files=None):
|
||||
'''
|
||||
Perform remote gradebook action. Returns msg, datatable.
|
||||
@@ -1520,7 +1517,7 @@ def dump_grading_context(course):
|
||||
return msg
|
||||
|
||||
|
||||
def get_background_task_table(course_id, problem_url, student=None):
|
||||
def get_background_task_table(course_id, problem_url=None, student=None, task_type=None):
|
||||
"""
|
||||
Construct the "datatable" structure to represent background task history.
|
||||
|
||||
@@ -1531,14 +1528,17 @@ def get_background_task_table(course_id, problem_url, student=None):
|
||||
Returns a tuple of (msg, datatable), where the msg is a possible error message,
|
||||
and the datatable is the datatable to be used for display.
|
||||
"""
|
||||
history_entries = get_instructor_task_history(course_id, problem_url, student)
|
||||
history_entries = get_instructor_task_history(course_id, problem_url, student, task_type)
|
||||
datatable = {}
|
||||
msg = ""
|
||||
# first check to see if there is any history at all
|
||||
# (note that we don't have to check that the arguments are valid; it
|
||||
# just won't find any entries.)
|
||||
if (history_entries.count()) == 0:
|
||||
if student is not None:
|
||||
# TODO: figure out how to deal with task_type better here...
|
||||
if problem_url is None:
|
||||
msg += '<font color="red">Failed to find any background tasks for course "{course}".</font>'.format(course=course_id)
|
||||
elif student is not None:
|
||||
template = '<font color="red">Failed to find any background tasks for course "{course}", module "{problem}" and student "{student}".</font>'
|
||||
msg += template.format(course=course_id, problem=problem_url, student=student.username)
|
||||
else:
|
||||
@@ -1575,7 +1575,9 @@ def get_background_task_table(course_id, problem_url, student=None):
|
||||
task_message]
|
||||
datatable['data'].append(row)
|
||||
|
||||
if student is not None:
|
||||
if problem_url is None:
|
||||
datatable['title'] = "{course_id}".format(course_id=course_id)
|
||||
elif student is not None:
|
||||
datatable['title'] = "{course_id} > {location} > {student}".format(course_id=course_id,
|
||||
location=problem_url,
|
||||
student=student.username)
|
||||
|
||||
@@ -113,8 +113,16 @@ def _update_instructor_task(instructor_task, task_result):
|
||||
# Assume we don't always update the InstructorTask entry if we don't have to:
|
||||
entry_needs_saving = False
|
||||
task_output = None
|
||||
entry_needs_updating = True
|
||||
|
||||
if result_state in [PROGRESS, SUCCESS]:
|
||||
if result_state == SUCCESS and instructor_task.task_state == PROGRESS and len(instructor_task.subtasks) > 0:
|
||||
# This happens when running subtasks: the result object is marked with SUCCESS,
|
||||
# meaning that the subtasks have successfully been defined. However, the InstructorTask
|
||||
# will be marked as in PROGRESS, until the last subtask completes and marks it as SUCCESS.
|
||||
# We want to ignore the parent SUCCESS if subtasks are still running, and just trust the
|
||||
# contents of the InstructorTask.
|
||||
entry_needs_updating = False
|
||||
elif result_state in [PROGRESS, SUCCESS]:
|
||||
# construct a status message directly from the task result's result:
|
||||
# it needs to go back with the entry passed in.
|
||||
log.info("background task (%s), state %s: result: %s", task_id, result_state, returned_result)
|
||||
@@ -136,12 +144,13 @@ def _update_instructor_task(instructor_task, task_result):
|
||||
# save progress and state into the entry, even if it's not being saved:
|
||||
# when celery is run in "ALWAYS_EAGER" mode, progress needs to go back
|
||||
# with the entry passed in.
|
||||
instructor_task.task_state = result_state
|
||||
if task_output is not None:
|
||||
instructor_task.task_output = task_output
|
||||
if entry_needs_updating:
|
||||
instructor_task.task_state = result_state
|
||||
if task_output is not None:
|
||||
instructor_task.task_output = task_output
|
||||
|
||||
if entry_needs_saving:
|
||||
instructor_task.save()
|
||||
if entry_needs_saving:
|
||||
instructor_task.save()
|
||||
|
||||
|
||||
def get_updated_instructor_task(task_id):
|
||||
@@ -177,7 +186,7 @@ def get_status_from_instructor_task(instructor_task):
|
||||
'in_progress': boolean indicating if task is still running.
|
||||
'task_progress': dict containing progress information. This includes:
|
||||
'attempted': number of attempts made
|
||||
'updated': number of attempts that "succeeded"
|
||||
'succeeded': number of attempts that "succeeded"
|
||||
'total': number of possible subtasks to attempt
|
||||
'action_name': user-visible verb to use in status messages. Should be past-tense.
|
||||
'duration_ms': how long the task has (or had) been running.
|
||||
|
||||
@@ -31,6 +31,11 @@ TASK_LOG = get_task_logger(__name__)
|
||||
# define value to use when no task_id is provided:
|
||||
UNKNOWN_TASK_ID = 'unknown-task_id'
|
||||
|
||||
# define values for update functions to use to return status to perform_module_state_update
|
||||
UPDATE_STATUS_SUCCEEDED = 'succeeded'
|
||||
UPDATE_STATUS_FAILED = 'failed'
|
||||
UPDATE_STATUS_SKIPPED = 'skipped'
|
||||
|
||||
|
||||
class UpdateProblemModuleStateError(Exception):
|
||||
"""
|
||||
@@ -47,7 +52,6 @@ def _get_current_task():
|
||||
return current_task
|
||||
|
||||
|
||||
# def perform_module_state_update(course_id, module_state_key, student_identifier, update_fcn, action_name, filter_fcn):
|
||||
def perform_module_state_update(update_fcn, filter_fcn, entry_id, course_id, task_input, action_name):
|
||||
"""
|
||||
Performs generic update by visiting StudentModule instances with the update_fcn provided.
|
||||
@@ -69,7 +73,9 @@ def perform_module_state_update(update_fcn, filter_fcn, entry_id, course_id, tas
|
||||
The return value is a dict containing the task's results, with the following keys:
|
||||
|
||||
'attempted': number of attempts made
|
||||
'updated': number of attempts that "succeeded"
|
||||
'succeeded': number of attempts that "succeeded"
|
||||
'skipped': number of attempts that "skipped"
|
||||
'failed': number of attempts that "failed"
|
||||
'total': number of possible subtasks to attempt
|
||||
'action_name': user-visible verb to use in status messages. Should be past-tense.
|
||||
Pass-through of input `action_name`.
|
||||
@@ -111,8 +117,10 @@ def perform_module_state_update(update_fcn, filter_fcn, entry_id, course_id, tas
|
||||
modules_to_update = filter_fcn(modules_to_update)
|
||||
|
||||
# perform the main loop
|
||||
num_updated = 0
|
||||
num_attempted = 0
|
||||
num_succeeded = 0
|
||||
num_skipped = 0
|
||||
num_failed = 0
|
||||
num_total = modules_to_update.count()
|
||||
|
||||
def get_task_progress():
|
||||
@@ -120,7 +128,9 @@ def perform_module_state_update(update_fcn, filter_fcn, entry_id, course_id, tas
|
||||
current_time = time()
|
||||
progress = {'action_name': action_name,
|
||||
'attempted': num_attempted,
|
||||
'updated': num_updated,
|
||||
'succeeded': num_succeeded,
|
||||
'skipped': num_skipped,
|
||||
'failed': num_failed,
|
||||
'total': num_total,
|
||||
'duration_ms': int((current_time - start_time) * 1000),
|
||||
}
|
||||
@@ -133,10 +143,17 @@ def perform_module_state_update(update_fcn, filter_fcn, entry_id, course_id, tas
|
||||
# There is no try here: if there's an error, we let it throw, and the task will
|
||||
# be marked as FAILED, with a stack trace.
|
||||
with dog_stats_api.timer('instructor_tasks.module.time.step', tags=['action:{name}'.format(name=action_name)]):
|
||||
if update_fcn(module_descriptor, module_to_update):
|
||||
update_status = update_fcn(module_descriptor, module_to_update)
|
||||
if update_status == UPDATE_STATUS_SUCCEEDED:
|
||||
# If the update_fcn returns true, then it performed some kind of work.
|
||||
# Logging of failures is left to the update_fcn itself.
|
||||
num_updated += 1
|
||||
num_succeeded += 1
|
||||
elif update_status == UPDATE_STATUS_FAILED:
|
||||
num_failed += 1
|
||||
elif update_status == UPDATE_STATUS_SKIPPED:
|
||||
num_skipped += 1
|
||||
else:
|
||||
raise UpdateProblemModuleStateError("Unexpected update_status returned: {}".format(update_status))
|
||||
|
||||
# update task status:
|
||||
task_progress = get_task_progress()
|
||||
@@ -163,7 +180,9 @@ def run_main_task(entry_id, task_fcn, action_name, spawns_subtasks=False):
|
||||
If no exceptions are raised, a dict containing the task's result is returned, with the following keys:
|
||||
|
||||
'attempted': number of attempts made
|
||||
'updated': number of attempts that "succeeded"
|
||||
'succeeded': number of attempts that "succeeded"
|
||||
'skipped': number of attempts that "skipped"
|
||||
'failed': number of attempts that "failed"
|
||||
'total': number of possible subtasks to attempt
|
||||
'action_name': user-visible verb to use in status messages. Should be past-tense.
|
||||
Pass-through of input `action_name`.
|
||||
@@ -216,7 +235,6 @@ def run_main_task(entry_id, task_fcn, action_name, spawns_subtasks=False):
|
||||
|
||||
# Now do the work:
|
||||
with dog_stats_api.timer('instructor_tasks.time.overall', tags=['action:{name}'.format(name=action_name)]):
|
||||
# REMOVE: task_progress = visit_fcn(course_id, module_state_key, student_ident, update_fcn, action_name, filter_fcn)
|
||||
task_progress = task_fcn(entry_id, course_id, task_input, action_name)
|
||||
|
||||
# If we get here, we assume we've succeeded, so update the InstructorTask entry in anticipation.
|
||||
@@ -226,6 +244,7 @@ def run_main_task(entry_id, task_fcn, action_name, spawns_subtasks=False):
|
||||
# as part of the main task. There is probably some way to represent this more elegantly, but for
|
||||
# now, we will just use an explicit flag.
|
||||
if spawns_subtasks:
|
||||
# TODO: UPDATE THIS.
|
||||
# we change the rules here. If it's a task with subtasks running, then we
|
||||
# explicitly set its state, with the idea that progress will be updated
|
||||
# directly into the InstructorTask object, rather than into the parent task's
|
||||
@@ -370,15 +389,15 @@ def rescore_problem_module_state(xmodule_instance_args, module_descriptor, stude
|
||||
# don't consider these fatal, but false means that the individual call didn't complete:
|
||||
TASK_LOG.warning(u"error processing rescore call for course {course}, problem {loc} and student {student}: "
|
||||
"unexpected response {msg}".format(msg=result, course=course_id, loc=module_state_key, student=student))
|
||||
return False
|
||||
return UPDATE_STATUS_FAILED
|
||||
elif result['success'] not in ['correct', 'incorrect']:
|
||||
TASK_LOG.warning(u"error processing rescore call for course {course}, problem {loc} and student {student}: "
|
||||
"{msg}".format(msg=result['success'], course=course_id, loc=module_state_key, student=student))
|
||||
return False
|
||||
return UPDATE_STATUS_FAILED
|
||||
else:
|
||||
TASK_LOG.debug(u"successfully processed rescore call for course {course}, problem {loc} and student {student}: "
|
||||
"{msg}".format(msg=result['success'], course=course_id, loc=module_state_key, student=student))
|
||||
return True
|
||||
return UPDATE_STATUS_SUCCEEDED
|
||||
|
||||
|
||||
@transaction.autocommit
|
||||
@@ -386,8 +405,10 @@ def reset_attempts_module_state(xmodule_instance_args, _module_descriptor, stude
|
||||
"""
|
||||
Resets problem attempts to zero for specified `student_module`.
|
||||
|
||||
Always returns true, indicating success, if it doesn't raise an exception due to database error.
|
||||
Returns a status of UPDATE_STATUS_SUCCEEDED if a problem has non-zero attempts
|
||||
that are being reset, and UPDATE_STATUS_SKIPPED otherwise.
|
||||
"""
|
||||
update_status = UPDATE_STATUS_SKIPPED
|
||||
problem_state = json.loads(student_module.state) if student_module.state else {}
|
||||
if 'attempts' in problem_state:
|
||||
old_number_of_attempts = problem_state["attempts"]
|
||||
@@ -401,9 +422,9 @@ def reset_attempts_module_state(xmodule_instance_args, _module_descriptor, stude
|
||||
track_function = _get_track_function_for_task(student_module.student, xmodule_instance_args)
|
||||
event_info = {"old_attempts": old_number_of_attempts, "new_attempts": 0}
|
||||
track_function('problem_reset_attempts', event_info)
|
||||
update_status = UPDATE_STATUS_SUCCEEDED
|
||||
|
||||
# consider the reset to be successful, even if no update was performed. (It's just "optimized".)
|
||||
return True
|
||||
return update_status
|
||||
|
||||
|
||||
@transaction.autocommit
|
||||
@@ -411,52 +432,11 @@ def delete_problem_module_state(xmodule_instance_args, _module_descriptor, stude
|
||||
"""
|
||||
Delete the StudentModule entry.
|
||||
|
||||
Always returns true, indicating success, if it doesn't raise an exception due to database error.
|
||||
Always returns UPDATE_STATUS_SUCCEEDED, indicating success, if it doesn't raise an exception due to database error.
|
||||
"""
|
||||
student_module.delete()
|
||||
# get request-related tracking information from args passthrough,
|
||||
# and supplement with task-specific information:
|
||||
track_function = _get_track_function_for_task(student_module.student, xmodule_instance_args)
|
||||
track_function('problem_delete_state', {})
|
||||
return True
|
||||
|
||||
|
||||
#def perform_delegate_email_batches(entry_id, course_id, task_input, action_name):
|
||||
# """
|
||||
# """
|
||||
# # Get start time for task:
|
||||
# start_time = time()
|
||||
#
|
||||
# # perform the main loop
|
||||
# num_updated = 0
|
||||
# num_attempted = 0
|
||||
# num_total = enrolled_students.count()
|
||||
#
|
||||
# def get_task_progress():
|
||||
# """Return a dict containing info about current task"""
|
||||
# current_time = time()
|
||||
# progress = {'action_name': action_name,
|
||||
# 'attempted': num_attempted,
|
||||
# 'updated': num_updated,
|
||||
# 'total': num_total,
|
||||
# 'duration_ms': int((current_time - start_time) * 1000),
|
||||
# }
|
||||
# return progress
|
||||
#
|
||||
# task_progress = get_task_progress()
|
||||
# _get_current_task().update_state(state=PROGRESS, meta=task_progress)
|
||||
# for enrolled_student in enrolled_students:
|
||||
# num_attempted += 1
|
||||
# # There is no try here: if there's an error, we let it throw, and the task will
|
||||
# # be marked as FAILED, with a stack trace.
|
||||
# with dog_stats_api.timer('instructor_tasks.student.time.step', tags=['action:{name}'.format(name=action_name)]):
|
||||
# if update_fcn(course_descriptor, enrolled_student):
|
||||
# # If the update_fcn returns true, then it performed some kind of work.
|
||||
# # Logging of failures is left to the update_fcn itself.
|
||||
# num_updated += 1
|
||||
#
|
||||
# # update task status:
|
||||
# task_progress = get_task_progress()
|
||||
# _get_current_task().update_state(state=PROGRESS, meta=task_progress)
|
||||
#
|
||||
# return task_progress
|
||||
return UPDATE_STATUS_SUCCEEDED
|
||||
|
||||
@@ -88,7 +88,7 @@ class InstructorTaskTestCase(TestCase):
|
||||
def _create_progress_entry(self, student=None, task_state=PROGRESS):
|
||||
"""Creates a InstructorTask entry representing a task in progress."""
|
||||
progress = {'attempted': 3,
|
||||
'updated': 2,
|
||||
'succeeded': 2,
|
||||
'total': 5,
|
||||
'action_name': 'rescored',
|
||||
}
|
||||
@@ -120,6 +120,7 @@ class InstructorTaskModuleTestCase(LoginEnrollmentTestCase, ModuleStoreTestCase)
|
||||
# add a sequence to the course to which the problems can be added
|
||||
self.problem_section = ItemFactory.create(parent_location=chapter.location,
|
||||
category='sequential',
|
||||
metadata={'graded': True, 'format': 'Homework'},
|
||||
display_name=TEST_SECTION_NAME)
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -227,7 +227,7 @@ class TestRescoringTask(TestIntegrationTask):
|
||||
self.assertEqual(task_input['problem_url'], InstructorTaskModuleTestCase.problem_location(problem_url_name))
|
||||
status = json.loads(instructor_task.task_output)
|
||||
self.assertEqual(status['attempted'], 1)
|
||||
self.assertEqual(status['updated'], 0)
|
||||
self.assertEqual(status['succeeded'], 0)
|
||||
self.assertEqual(status['total'], 1)
|
||||
|
||||
def define_code_response_problem(self, problem_url_name):
|
||||
|
||||
@@ -104,14 +104,14 @@ class TestInstructorTasks(InstructorTaskModuleTestCase):
|
||||
def test_delete_undefined_problem(self):
|
||||
self._test_undefined_problem(delete_problem_state)
|
||||
|
||||
def _test_run_with_task(self, task_function, action_name, expected_num_updated):
|
||||
def _test_run_with_task(self, task_function, action_name, expected_num_succeeded):
|
||||
"""Run a task and check the number of StudentModules processed."""
|
||||
task_entry = self._create_input_entry()
|
||||
status = self._run_task_with_mock_celery(task_function, task_entry.id, task_entry.task_id)
|
||||
# check return value
|
||||
self.assertEquals(status.get('attempted'), expected_num_updated)
|
||||
self.assertEquals(status.get('updated'), expected_num_updated)
|
||||
self.assertEquals(status.get('total'), expected_num_updated)
|
||||
self.assertEquals(status.get('attempted'), expected_num_succeeded)
|
||||
self.assertEquals(status.get('succeeded'), expected_num_succeeded)
|
||||
self.assertEquals(status.get('total'), expected_num_succeeded)
|
||||
self.assertEquals(status.get('action_name'), action_name)
|
||||
self.assertGreater('duration_ms', 0)
|
||||
# compare with entry in table:
|
||||
@@ -209,7 +209,7 @@ class TestInstructorTasks(InstructorTaskModuleTestCase):
|
||||
status = self._run_task_with_mock_celery(reset_problem_attempts, task_entry.id, task_entry.task_id)
|
||||
# check return value
|
||||
self.assertEquals(status.get('attempted'), 1)
|
||||
self.assertEquals(status.get('updated'), 1)
|
||||
self.assertEquals(status.get('succeeded'), 1)
|
||||
self.assertEquals(status.get('total'), 1)
|
||||
self.assertEquals(status.get('action_name'), 'reset')
|
||||
self.assertGreater('duration_ms', 0)
|
||||
@@ -371,7 +371,7 @@ class TestInstructorTasks(InstructorTaskModuleTestCase):
|
||||
entry = InstructorTask.objects.get(id=task_entry.id)
|
||||
output = json.loads(entry.task_output)
|
||||
self.assertEquals(output.get('attempted'), num_students)
|
||||
self.assertEquals(output.get('updated'), num_students)
|
||||
self.assertEquals(output.get('succeeded'), num_students)
|
||||
self.assertEquals(output.get('total'), num_students)
|
||||
self.assertEquals(output.get('action_name'), 'rescored')
|
||||
self.assertGreater('duration_ms', 0)
|
||||
|
||||
@@ -84,7 +84,7 @@ class InstructorTaskReportTest(InstructorTaskTestCase):
|
||||
self.assertEquals(output['task_state'], SUCCESS)
|
||||
self.assertFalse(output['in_progress'])
|
||||
expected_progress = {'attempted': 3,
|
||||
'updated': 2,
|
||||
'succeeded': 2,
|
||||
'total': 5,
|
||||
'action_name': 'rescored'}
|
||||
self.assertEquals(output['task_progress'], expected_progress)
|
||||
@@ -121,7 +121,7 @@ class InstructorTaskReportTest(InstructorTaskTestCase):
|
||||
mock_result.task_id = task_id
|
||||
mock_result.state = PROGRESS
|
||||
mock_result.result = {'attempted': 5,
|
||||
'updated': 4,
|
||||
'succeeded': 4,
|
||||
'total': 10,
|
||||
'action_name': 'rescored'}
|
||||
output = self._test_get_status_from_result(task_id, mock_result)
|
||||
@@ -165,7 +165,7 @@ class InstructorTaskReportTest(InstructorTaskTestCase):
|
||||
expected_progress = {'message': "Task revoked before running"}
|
||||
self.assertEquals(output['task_progress'], expected_progress)
|
||||
|
||||
def _get_output_for_task_success(self, attempted, updated, total, student=None):
|
||||
def _get_output_for_task_success(self, attempted, succeeded, total, student=None):
|
||||
"""returns the task_id and the result returned by instructor_task_status()."""
|
||||
# view task entry for task in progress
|
||||
instructor_task = self._create_progress_entry(student)
|
||||
@@ -174,7 +174,7 @@ class InstructorTaskReportTest(InstructorTaskTestCase):
|
||||
mock_result.task_id = task_id
|
||||
mock_result.state = SUCCESS
|
||||
mock_result.result = {'attempted': attempted,
|
||||
'updated': updated,
|
||||
'succeeded': succeeded,
|
||||
'total': total,
|
||||
'action_name': 'rescored'}
|
||||
output = self._test_get_status_from_result(task_id, mock_result)
|
||||
@@ -187,7 +187,7 @@ class InstructorTaskReportTest(InstructorTaskTestCase):
|
||||
self.assertEquals(output['task_state'], SUCCESS)
|
||||
self.assertFalse(output['in_progress'])
|
||||
expected_progress = {'attempted': 10,
|
||||
'updated': 8,
|
||||
'succeeded': 8,
|
||||
'total': 10,
|
||||
'action_name': 'rescored'}
|
||||
self.assertEquals(output['task_progress'], expected_progress)
|
||||
|
||||
@@ -65,7 +65,7 @@ def instructor_task_status(request):
|
||||
'in_progress': boolean indicating if task is still running.
|
||||
'task_progress': dict containing progress information. This includes:
|
||||
'attempted': number of attempts made
|
||||
'updated': number of attempts that "succeeded"
|
||||
'succeeded': number of attempts that "succeeded"
|
||||
'total': number of possible subtasks to attempt
|
||||
'action_name': user-visible verb to use in status messages. Should be past-tense.
|
||||
'duration_ms': how long the task has (or had) been running.
|
||||
@@ -122,16 +122,20 @@ def get_task_completion_info(instructor_task):
|
||||
if instructor_task.task_state in [FAILURE, REVOKED]:
|
||||
return (succeeded, task_output.get('message', 'No message provided'))
|
||||
|
||||
if any([key not in task_output for key in ['action_name', 'attempted', 'updated', 'total']]):
|
||||
if any([key not in task_output for key in ['action_name', 'attempted', 'total']]):
|
||||
fmt = "Invalid task_output information found for instructor_task {0}: {1}"
|
||||
log.warning(fmt.format(instructor_task.task_id, instructor_task.task_output))
|
||||
return (succeeded, "No progress status information available")
|
||||
|
||||
action_name = task_output['action_name']
|
||||
num_attempted = task_output['attempted']
|
||||
num_updated = task_output['updated']
|
||||
num_total = task_output['total']
|
||||
|
||||
# old tasks may still have 'updated' instead of the preferred 'succeeded':
|
||||
num_succeeded = task_output.get('updated', 0) + task_output.get('succeeded', 0)
|
||||
num_skipped = task_output.get('skipped', 0)
|
||||
# num_failed = task_output.get('failed', 0)
|
||||
|
||||
student = None
|
||||
problem_url = None
|
||||
email_id = None
|
||||
@@ -147,12 +151,12 @@ def get_task_completion_info(instructor_task):
|
||||
|
||||
if instructor_task.task_state == PROGRESS:
|
||||
# special message for providing progress updates:
|
||||
msg_format = "Progress: {action} {updated} of {attempted} so far"
|
||||
msg_format = "Progress: {action} {succeeded} of {attempted} so far"
|
||||
elif student is not None and problem_url is not None:
|
||||
# this reports on actions on problems for a particular student:
|
||||
if num_attempted == 0:
|
||||
msg_format = "Unable to find submission to be {action} for student '{student}'"
|
||||
elif num_updated == 0:
|
||||
elif num_succeeded == 0:
|
||||
msg_format = "Problem failed to be {action} for student '{student}'"
|
||||
else:
|
||||
succeeded = True
|
||||
@@ -161,33 +165,40 @@ def get_task_completion_info(instructor_task):
|
||||
# this reports on actions on problems for all students:
|
||||
if num_attempted == 0:
|
||||
msg_format = "Unable to find any students with submissions to be {action}"
|
||||
elif num_updated == 0:
|
||||
elif num_succeeded == 0:
|
||||
msg_format = "Problem failed to be {action} for any of {attempted} students"
|
||||
elif num_updated == num_attempted:
|
||||
elif num_succeeded == num_attempted:
|
||||
succeeded = True
|
||||
msg_format = "Problem successfully {action} for {attempted} students"
|
||||
else: # num_updated < num_attempted
|
||||
msg_format = "Problem {action} for {updated} of {attempted} students"
|
||||
else: # num_succeeded < num_attempted
|
||||
msg_format = "Problem {action} for {succeeded} of {attempted} students"
|
||||
elif email_id is not None:
|
||||
# this reports on actions on bulk emails
|
||||
if num_attempted == 0:
|
||||
msg_format = "Unable to find any recipients to be {action}"
|
||||
elif num_updated == 0:
|
||||
elif num_succeeded == 0:
|
||||
msg_format = "Message failed to be {action} for any of {attempted} recipients "
|
||||
elif num_updated == num_attempted:
|
||||
elif num_succeeded == num_attempted:
|
||||
succeeded = True
|
||||
msg_format = "Message successfully {action} for {attempted} recipients"
|
||||
else: # num_updated < num_attempted
|
||||
msg_format = "Message {action} for {updated} of {attempted} recipients"
|
||||
else: # num_succeeded < num_attempted
|
||||
msg_format = "Message {action} for {succeeded} of {attempted} recipients"
|
||||
else:
|
||||
# provide a default:
|
||||
msg_format = "Status: {action} {updated} of {attempted}"
|
||||
msg_format = "Status: {action} {succeeded} of {attempted}"
|
||||
|
||||
if num_skipped > 0:
|
||||
msg_format += " (skipping {skipped})"
|
||||
|
||||
if student is None and num_attempted != num_total:
|
||||
msg_format += " (out of {total})"
|
||||
|
||||
# Update status in task result object itself:
|
||||
message = msg_format.format(action=action_name, updated=num_updated,
|
||||
attempted=num_attempted, total=num_total,
|
||||
student=student)
|
||||
message = msg_format.format(
|
||||
action=action_name,
|
||||
succeeded=num_succeeded,
|
||||
attempted=num_attempted,
|
||||
total=num_total,
|
||||
skipped=num_skipped,
|
||||
student=student)
|
||||
return (succeeded, message)
|
||||
|
||||
@@ -507,6 +507,13 @@ function goto( mode)
|
||||
return true;
|
||||
}
|
||||
</script>
|
||||
|
||||
<p>These email actions run in the background, and status for active email tasks will appear in a table below.
|
||||
To see status for all bulk email tasks submitted for this course, click on this button:
|
||||
</p>
|
||||
<p>
|
||||
<input type="submit" name="action" value="Show Background Email Task History">
|
||||
</p>
|
||||
%endif
|
||||
|
||||
</form>
|
||||
|
||||
Reference in New Issue
Block a user