Use separate retry count for calculating retry delay.
This commit is contained in:
@@ -12,8 +12,14 @@ from sys import exc_info
|
||||
from traceback import format_exc
|
||||
|
||||
from dogapi import dog_stats_api
|
||||
from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError
|
||||
from boto.ses.exceptions import SESDailyQuotaExceededError, SESMaxSendingRateExceededError
|
||||
from smtplib import SMTPServerDisconnected, SMTPDataError, SMTPConnectError, SMTPException
|
||||
from boto.ses.exceptions import (
|
||||
SESDailyQuotaExceededError,
|
||||
SESMaxSendingRateExceededError,
|
||||
SESAddressBlacklistedError,
|
||||
SESIllegalAddressError,
|
||||
SESLocalAddressCharacterError,
|
||||
)
|
||||
from boto.exception import AWSConnectionError
|
||||
|
||||
from celery import task, current_task, group
|
||||
@@ -44,18 +50,25 @@ from instructor_task.subtasks import (
|
||||
log = get_task_logger(__name__)
|
||||
|
||||
|
||||
# Errors that an individual email is failing to be sent, and should just
|
||||
# be treated as a fail.
|
||||
SINGLE_EMAIL_FAILURE_ERRORS = (SESAddressBlacklistedError, SESIllegalAddressError, SESLocalAddressCharacterError)
|
||||
|
||||
# Exceptions that, if caught, should cause the task to be re-tried.
|
||||
# These errors will be caught a maximum of 5 times before the task fails.
|
||||
RETRY_ERRORS = (SMTPDataError, SMTPConnectError, SMTPServerDisconnected, AWSConnectionError)
|
||||
# These errors will be caught a limited number of times before the task fails.
|
||||
LIMITED_RETRY_ERRORS = (SMTPDataError, SMTPConnectError, SMTPServerDisconnected, AWSConnectionError)
|
||||
|
||||
# Errors that involve exceeding a quota of sent email
|
||||
QUOTA_EXCEEDED_ERRORS = (SESDailyQuotaExceededError, )
|
||||
|
||||
# Errors that mail is being sent too quickly. When caught by a task, it
|
||||
# triggers an exponential backoff and retry. Retries happen continuously until
|
||||
# the email is sent.
|
||||
# Errors that indicate that a mailing task should be retried without limit.
|
||||
# An example is if email is being sent too quickly, but may succeed if sent
|
||||
# more slowly. When caught by a task, it triggers an exponential backoff and retry.
|
||||
# Retries happen continuously until the email is sent.
|
||||
INFINITE_RETRY_ERRORS = (SESMaxSendingRateExceededError, )
|
||||
|
||||
# Errors that are known to indicate an inability to send any more emails,
|
||||
# and should therefore not be retried. For example, exceeding a quota for emails.
|
||||
# Also, any SMTP errors that are not explicitly enumerated above.
|
||||
BULK_EMAIL_FAILURE_ERRORS = (SESDailyQuotaExceededError, SMTPException)
|
||||
|
||||
|
||||
def _get_recipient_queryset(user_id, to_option, course_id, course_location):
|
||||
"""
|
||||
@@ -118,12 +131,14 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
"""
|
||||
entry = InstructorTask.objects.get(pk=entry_id)
|
||||
# get inputs to use in this task from the entry:
|
||||
#task_id = entry.task_id
|
||||
user_id = entry.requester.id
|
||||
task_id = entry.task_id
|
||||
|
||||
# TODO: check this against argument passed in?
|
||||
# course_id = entry.course_id
|
||||
# perfunctory check, since expansion is made for convenience of other task
|
||||
# code that doesn't need the entry_id.
|
||||
if course_id != entry.course_id:
|
||||
format_msg = "Course id conflict: explicit value %s does not match task value %s"
|
||||
raise ValueError(format_msg.format(course_id, entry.course_id))
|
||||
|
||||
email_id = task_input['email_id']
|
||||
try:
|
||||
@@ -138,15 +153,16 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
|
||||
to_option = email_obj.to_option
|
||||
|
||||
# TODO: instead of fetching from email object, compare instead to
|
||||
# confirm that they match, and raise an exception if they don't.
|
||||
# course_id = email_obj.course_id
|
||||
# sanity check that course for email_obj matches that of the task referencing it:
|
||||
if course_id != email_obj.course_id:
|
||||
format_msg = "Course id conflict: explicit value %s does not match email value %s"
|
||||
raise ValueError(format_msg.format(course_id, email_obj.course_id))
|
||||
|
||||
try:
|
||||
course = get_course_by_id(course_id, depth=1)
|
||||
except Http404 as exc:
|
||||
log.exception("Task %s: get_course_by_id failed: %s", task_id, exc.args[0])
|
||||
raise Exception("get_course_by_id failed: " + exc.args[0])
|
||||
raise ValueError("Course not found: " + exc.args[0])
|
||||
|
||||
global_email_context = _get_course_email_context(course)
|
||||
recipient_qset = _get_recipient_queryset(user_id, to_option, course_id, course.location)
|
||||
@@ -173,23 +189,26 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
for i in range(num_tasks_this_query):
|
||||
if i == num_tasks_this_query - 1:
|
||||
# Avoid cutting off the very last email when chunking a task that divides perfectly
|
||||
# (eg num_emails_this_query = 297 and EMAILS_PER_TASK is 100)
|
||||
# (e.g. num_emails_this_query = 297 and EMAILS_PER_TASK is 100)
|
||||
to_list = recipient_sublist[i * chunk:]
|
||||
else:
|
||||
to_list = recipient_sublist[i * chunk:i * chunk + chunk]
|
||||
subtask_id = str(uuid4())
|
||||
subtask_id_list.append(subtask_id)
|
||||
retry_progress = create_subtask_status()
|
||||
task_list.append(send_course_email.subtask((
|
||||
entry_id,
|
||||
email_id,
|
||||
to_list,
|
||||
global_email_context,
|
||||
retry_progress,
|
||||
),
|
||||
task_id=subtask_id,
|
||||
routing_key=settings.HIGH_PRIORITY_QUEUE,
|
||||
))
|
||||
subtask_status = create_subtask_status(subtask_id)
|
||||
# create subtask, passing args and kwargs:
|
||||
new_subtask = send_course_email.subtask(
|
||||
(
|
||||
entry_id,
|
||||
email_id,
|
||||
to_list,
|
||||
global_email_context,
|
||||
subtask_status,
|
||||
),
|
||||
task_id=subtask_id,
|
||||
routing_key=settings.BULK_EMAIL_ROUTING_KEY,
|
||||
)
|
||||
task_list.append(new_subtask)
|
||||
num_emails_queued += num_emails_this_query
|
||||
|
||||
# Sanity check: we expect the chunking to be properly summing to the original count:
|
||||
@@ -208,7 +227,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
|
||||
# now group the subtasks, and start them running:
|
||||
task_group = group(task_list)
|
||||
task_group.apply_async(routing_key=settings.HIGH_PRIORITY_QUEUE)
|
||||
task_group.apply_async(routing_key=settings.BULK_EMAIL_ROUTING_KEY)
|
||||
|
||||
# We want to return progress here, as this is what will be stored in the
|
||||
# AsyncResult for the parent task as its return value.
|
||||
@@ -218,13 +237,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
|
||||
return progress
|
||||
|
||||
|
||||
# TODO: figure out if we really need this after all (for unit tests...)
|
||||
def _get_current_task():
|
||||
"""Stub to make it easier to test without actually running Celery"""
|
||||
return current_task
|
||||
|
||||
|
||||
@task(default_retry_delay=15, max_retries=5) # pylint: disable=E1102
|
||||
@task(default_retry_delay=settings.BULK_EMAIL_DEFAULT_RETRY_DELAY, max_retries=settings.BULK_EMAIL_MAX_RETRIES) # pylint: disable=E1102
|
||||
def send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status):
|
||||
"""
|
||||
Sends an email to a list of recipients.
|
||||
@@ -249,8 +262,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
|
||||
# with it right away, but we also don't expect it to fail.
|
||||
InstructorTask.objects.get(pk=entry_id)
|
||||
|
||||
# Get information from current task's request:
|
||||
current_task_id = _get_current_task().request.id
|
||||
current_task_id = subtask_status['task_id']
|
||||
num_to_send = len(to_list)
|
||||
log.info("Preparing to send email %s to %d recipients as subtask %s for instructor task %d: context = %s, status=%s",
|
||||
email_id, num_to_send, current_task_id, entry_id, global_email_context, subtask_status)
|
||||
@@ -295,6 +307,7 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
|
||||
update_subtask_status(entry_id, current_task_id, new_subtask_status)
|
||||
raise send_exception
|
||||
|
||||
log.info("background task (%s) returning status %s", current_task_id, new_subtask_status)
|
||||
return new_subtask_status
|
||||
|
||||
|
||||
@@ -332,12 +345,14 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
'failed' count above.
|
||||
"""
|
||||
# Get information from current task's request:
|
||||
task_id = _get_current_task().request.id
|
||||
retry_index = _get_current_task().request.retries
|
||||
#task_id = _get_current_task().request.id
|
||||
#retry_index = _get_current_task().request.retries
|
||||
task_id = subtask_status['task_id']
|
||||
|
||||
# If this is a second attempt, then throttle the speed at which mail is sent:
|
||||
throttle = retry_index > 0
|
||||
throttle = subtask_status['retried_nomax'] > 0
|
||||
|
||||
# collect stats on progress:
|
||||
num_optout = 0
|
||||
num_sent = 0
|
||||
num_error = 0
|
||||
@@ -354,7 +369,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
# attempt. Anyone on the to_list on a retry has already passed the filter
|
||||
# that existed at that time, and we don't need to keep checking for changes
|
||||
# in the Optout list.
|
||||
if retry_index == 0:
|
||||
if (subtask_status['retried_nomax'] + subtask_status['retried_withmax']) == 0:
|
||||
optouts = (Optout.objects.filter(course_id=course_email.course_id,
|
||||
user__in=[i['pk'] for i in to_list])
|
||||
.values_list('user__email', flat=True))
|
||||
@@ -403,7 +418,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
)
|
||||
email_msg.attach_alternative(html_msg, 'text/html')
|
||||
|
||||
# Throttle if we tried a few times and got the rate limiter
|
||||
# Throttle if we have gotten the rate limiter
|
||||
if throttle:
|
||||
sleep(0.2)
|
||||
|
||||
@@ -413,11 +428,6 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
with dog_stats_api.timer('course_email.single_send.time.overall', tags=[_statsd_tag(course_title)]):
|
||||
connection.send_messages([email_msg])
|
||||
|
||||
dog_stats_api.increment('course_email.sent', tags=[_statsd_tag(course_title)])
|
||||
|
||||
log.info('Email with id %s sent to %s', email_id, email)
|
||||
num_sent += 1
|
||||
|
||||
except SMTPDataError as exc:
|
||||
# According to SMTP spec, we'll retry error codes in the 4xx range. 5xx range indicates hard failure.
|
||||
if exc.smtp_code >= 400 and exc.smtp_code < 500:
|
||||
@@ -429,52 +439,56 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
|
||||
num_error += 1
|
||||
|
||||
except SINGLE_EMAIL_FAILURE_ERRORS as exc:
|
||||
# This will fall through and not retry the message, since it will be popped
|
||||
log.warning('Task %s: email with id %s not delivered to %s due to error %s', task_id, email_id, email, exc)
|
||||
dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
|
||||
num_error += 1
|
||||
|
||||
else:
|
||||
dog_stats_api.increment('course_email.sent', tags=[_statsd_tag(course_title)])
|
||||
|
||||
log.info('Email with id %s sent to %s', email_id, email)
|
||||
num_sent += 1
|
||||
|
||||
# Pop the user that was emailed off the end of the list:
|
||||
to_list.pop()
|
||||
|
||||
except INFINITE_RETRY_ERRORS as exc:
|
||||
dog_stats_api.increment('course_email.infinite_retry', tags=[_statsd_tag(course_title)])
|
||||
subtask_progress = increment_subtask_status(
|
||||
subtask_status,
|
||||
succeeded=num_sent,
|
||||
failed=num_error,
|
||||
skipped=num_optout,
|
||||
retriedA=1,
|
||||
retried_nomax=1,
|
||||
state=RETRY
|
||||
)
|
||||
return _submit_for_retry(
|
||||
entry_id, email_id, to_list, global_email_context, exc, subtask_progress, True
|
||||
)
|
||||
|
||||
except RETRY_ERRORS as exc:
|
||||
except LIMITED_RETRY_ERRORS as exc:
|
||||
# Errors caught here cause the email to be retried. The entire task is actually retried
|
||||
# without popping the current recipient off of the existing list.
|
||||
# Errors caught are those that indicate a temporary condition that might succeed on retry.
|
||||
dog_stats_api.increment('course_email.limited_retry', tags=[_statsd_tag(course_title)])
|
||||
subtask_progress = increment_subtask_status(
|
||||
subtask_status,
|
||||
succeeded=num_sent,
|
||||
failed=num_error,
|
||||
skipped=num_optout,
|
||||
retriedB=1,
|
||||
retried_withmax=1,
|
||||
state=RETRY
|
||||
)
|
||||
return _submit_for_retry(
|
||||
entry_id, email_id, to_list, global_email_context, exc, subtask_progress, False
|
||||
)
|
||||
|
||||
except Exception as exc:
|
||||
|
||||
# If we have a general exception for this request, we need to figure out what to do with it.
|
||||
# If we're going to just mark it as failed
|
||||
# And the log message below should indicate which task_id is failing, so we have a chance to
|
||||
# reconstruct the problems.
|
||||
if isinstance(exc, QUOTA_EXCEEDED_ERRORS):
|
||||
log.exception('WARNING: Course "%s" exceeded quota!', course_title)
|
||||
log.exception('Email with id %d not sent due to exceeding quota. To list: %s',
|
||||
email_id,
|
||||
[i['email'] for i in to_list])
|
||||
else:
|
||||
log.exception('Task %s: email with id %d caused send_course_email task to fail with uncaught exception. To list: %s',
|
||||
task_id, email_id, [i['email'] for i in to_list])
|
||||
except BULK_EMAIL_FAILURE_ERRORS as exc:
|
||||
dog_stats_api.increment('course_email.error', tags=[_statsd_tag(course_title)])
|
||||
log.exception('Task %s: email with id %d caused send_course_email task to fail with "fatal" exception. To list: %s',
|
||||
task_id, email_id, [i['email'] for i in to_list])
|
||||
num_error += len(to_list)
|
||||
subtask_progress = increment_subtask_status(
|
||||
subtask_status,
|
||||
@@ -484,6 +498,27 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
state=FAILURE
|
||||
)
|
||||
return subtask_progress, exc
|
||||
|
||||
except Exception as exc:
|
||||
# Errors caught here cause the email to be retried. The entire task is actually retried
|
||||
# without popping the current recipient off of the existing list.
|
||||
# These are unexpected errors. Since they might be due to a temporary condition that might
|
||||
# succeed on retry, we give them a retry.
|
||||
dog_stats_api.increment('course_email.limited_retry', tags=[_statsd_tag(course_title)])
|
||||
log.exception('Task %s: email with id %d caused send_course_email task to fail with unexpected exception. Generating retry.',
|
||||
task_id, email_id)
|
||||
subtask_progress = increment_subtask_status(
|
||||
subtask_status,
|
||||
succeeded=num_sent,
|
||||
failed=num_error,
|
||||
skipped=num_optout,
|
||||
retried_withmax=1,
|
||||
state=RETRY
|
||||
)
|
||||
return _submit_for_retry(
|
||||
entry_id, email_id, to_list, global_email_context, exc, subtask_progress, False
|
||||
)
|
||||
|
||||
else:
|
||||
# Successful completion is marked by an exception value of None:
|
||||
subtask_progress = increment_subtask_status(
|
||||
@@ -499,13 +534,18 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
|
||||
connection.close()
|
||||
|
||||
|
||||
def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_progress, is_sending_rate_error):
|
||||
def _get_current_task():
|
||||
"""Stub to make it easier to test without actually running Celery"""
|
||||
return current_task
|
||||
|
||||
|
||||
def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current_exception, subtask_status, is_sending_rate_error):
|
||||
"""
|
||||
Helper function to requeue a task for retry, using the new version of arguments provided.
|
||||
|
||||
Inputs are the same as for running a task, plus two extra indicating the state at the time of retry.
|
||||
These include the `current_exception` that the task encountered that is causing the retry attempt,
|
||||
and the `subtask_progress` that is to be returned.
|
||||
and the `subtask_status` that is to be returned.
|
||||
|
||||
Returns a tuple of two values:
|
||||
* First value is a dict which represents current progress. Keys are:
|
||||
@@ -519,27 +559,29 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
|
||||
successfully submitted, this value will be the RetryTaskError that retry() returns.
|
||||
Otherwise, it (ought to be) the current_exception passed in.
|
||||
"""
|
||||
task_id = _get_current_task().request.id
|
||||
retry_index = _get_current_task().request.retries
|
||||
|
||||
# task_id = _get_current_task().request.id
|
||||
task_id = subtask_status['task_id']
|
||||
log.info("Task %s: Successfully sent to %s users; failed to send to %s users (and skipped %s users)",
|
||||
current_task.request.id, subtask_progress['succeeded'], subtask_progress['failed'], subtask_progress['skipped'])
|
||||
task_id, subtask_status['succeeded'], subtask_status['failed'], subtask_status['skipped'])
|
||||
|
||||
# Calculate time until we retry this task (in seconds):
|
||||
max_retries = _get_current_task().max_retries + subtask_status['retried_nomax']
|
||||
base_delay = _get_current_task().default_retry_delay
|
||||
if is_sending_rate_error:
|
||||
retry_index = subtask_status['retried_nomax']
|
||||
exp = min(retry_index, 5)
|
||||
countdown = ((2 ** exp) * 15) * random.uniform(.5, 1.25)
|
||||
countdown = ((2 ** exp) * base_delay) * random.uniform(.5, 1.25)
|
||||
exception_type = 'sending-rate'
|
||||
else:
|
||||
countdown = ((2 ** retry_index) * 15) * random.uniform(.75, 1.5)
|
||||
retry_index = subtask_status['retried_withmax']
|
||||
countdown = ((2 ** retry_index) * base_delay) * random.uniform(.75, 1.5)
|
||||
exception_type = 'transient'
|
||||
|
||||
# max_retries is increased by the number of times an "infinite-retry" exception
|
||||
# has been retried. We want the regular retries to trigger a retry, but not these
|
||||
# has been retried. We want the regular retries to trigger max-retry checking, but not these
|
||||
# special retries. So we count them separately.
|
||||
max_retries = _get_current_task().max_retries + subtask_progress['retriedA']
|
||||
log.warning('Task %s: email with id %d not delivered due to %s error %s, retrying send to %d recipients (with max_retry=%s)',
|
||||
task_id, email_id, exception_type, current_exception, len(to_list), max_retries)
|
||||
log.warning('Task %s: email with id %d not delivered due to %s error %s, retrying send to %d recipients in %s seconds (with max_retry=%s)',
|
||||
task_id, email_id, exception_type, current_exception, len(to_list), countdown, max_retries)
|
||||
|
||||
try:
|
||||
send_course_email.retry(
|
||||
@@ -548,7 +590,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
|
||||
email_id,
|
||||
to_list,
|
||||
global_email_context,
|
||||
subtask_progress,
|
||||
subtask_status,
|
||||
],
|
||||
exc=current_exception,
|
||||
countdown=countdown,
|
||||
@@ -559,7 +601,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
|
||||
# If retry call is successful, update with the current progress:
|
||||
log.exception('Task %s: email with id %d caused send_course_email task to retry.',
|
||||
task_id, email_id)
|
||||
return subtask_progress, retry_error
|
||||
return subtask_status, retry_error
|
||||
except Exception as retry_exc:
|
||||
# If there are no more retries, because the maximum has been reached,
|
||||
# we expect the original exception to be raised. We catch it here
|
||||
@@ -569,7 +611,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context, current
|
||||
log.exception('Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s',
|
||||
task_id, email_id, [i['email'] for i in to_list])
|
||||
num_failed = len(to_list)
|
||||
new_subtask_progress = increment_subtask_status(subtask_progress, failed=num_failed, state=FAILURE)
|
||||
new_subtask_progress = increment_subtask_status(subtask_status, failed=num_failed, state=FAILURE)
|
||||
return new_subtask_progress, retry_exc
|
||||
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ Unit tests for handling email sending errors
|
||||
from itertools import cycle
|
||||
from mock import patch, Mock
|
||||
from smtplib import SMTPDataError, SMTPServerDisconnected, SMTPConnectError
|
||||
from unittest import skip
|
||||
|
||||
from django.test.utils import override_settings
|
||||
from django.conf import settings
|
||||
@@ -93,9 +92,9 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
# Test that after the rejected email, the rest still successfully send
|
||||
((_initial_results), kwargs) = result.call_args
|
||||
self.assertEquals(kwargs['skipped'], 0)
|
||||
expectedNumFails = int((settings.EMAILS_PER_TASK + 3) / 4.0)
|
||||
self.assertEquals(kwargs['failed'], expectedNumFails)
|
||||
self.assertEquals(kwargs['succeeded'], settings.EMAILS_PER_TASK - expectedNumFails)
|
||||
expected_fails = int((settings.EMAILS_PER_TASK + 3) / 4.0)
|
||||
self.assertEquals(kwargs['failed'], expected_fails)
|
||||
self.assertEquals(kwargs['succeeded'], settings.EMAILS_PER_TASK - expected_fails)
|
||||
|
||||
@patch('bulk_email.tasks.get_connection', autospec=True)
|
||||
@patch('bulk_email.tasks.send_course_email.retry')
|
||||
@@ -144,7 +143,7 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
@patch('bulk_email.tasks.get_connection', Mock(return_value=EmailTestException))
|
||||
def test_general_exception(self, mock_log, retry, result):
|
||||
"""
|
||||
Tests the if the error is not SMTP-related, we log and reraise
|
||||
Tests the if the error is unexpected, we log and retry
|
||||
"""
|
||||
test_email = {
|
||||
'action': 'Send email',
|
||||
@@ -156,11 +155,10 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
# so we assert on the arguments of log.exception
|
||||
self.client.post(self.url, test_email)
|
||||
self.assertTrue(mock_log.exception.called)
|
||||
((log_str, _task_id, email_id, to_list), _) = mock_log.exception.call_args
|
||||
self.assertIn('caused send_course_email task to fail with uncaught exception.', log_str)
|
||||
((log_str, _task_id, email_id), _) = mock_log.exception.call_args
|
||||
self.assertIn('caused send_course_email task to fail with unexpected exception.', log_str)
|
||||
self.assertEqual(email_id, 1)
|
||||
self.assertEqual(to_list, [self.instructor.email])
|
||||
self.assertFalse(retry.called)
|
||||
self.assertTrue(retry.called)
|
||||
# check the results being returned
|
||||
self.assertTrue(result.called)
|
||||
((initial_results, ), kwargs) = result.call_args
|
||||
@@ -180,7 +178,7 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
entry = InstructorTask.create(course_id, "task_type", "task_key", "task_input", self.instructor)
|
||||
task_input = {"email_id": -1}
|
||||
with self.assertRaises(CourseEmail.DoesNotExist):
|
||||
perform_delegate_email_batches(entry.id, course_id, task_input, "action_name")
|
||||
perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") # pylint: disable=E1101
|
||||
((log_str, _, email_id), _) = mock_log.warning.call_args
|
||||
self.assertTrue(mock_log.warning.called)
|
||||
self.assertIn('Failed to get CourseEmail with id', log_str)
|
||||
@@ -196,9 +194,9 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
email = CourseEmail(course_id=course_id)
|
||||
email.save()
|
||||
entry = InstructorTask.create(course_id, "task_type", "task_key", "task_input", self.instructor)
|
||||
task_input = {"email_id": email.id}
|
||||
task_input = {"email_id": email.id} # pylint: disable=E1101
|
||||
with self.assertRaises(Exception):
|
||||
perform_delegate_email_batches(entry.id, course_id, task_input, "action_name")
|
||||
perform_delegate_email_batches(entry.id, course_id, task_input, "action_name") # pylint: disable=E1101
|
||||
((log_str, _, _), _) = mock_log.exception.call_args
|
||||
self.assertTrue(mock_log.exception.called)
|
||||
self.assertIn('get_course_by_id failed:', log_str)
|
||||
@@ -211,9 +209,9 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
email = CourseEmail(course_id=self.course.id, to_option="IDONTEXIST")
|
||||
email.save()
|
||||
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
|
||||
task_input = {"email_id": email.id}
|
||||
task_input = {"email_id": email.id} # pylint: disable=E1101
|
||||
with self.assertRaises(Exception):
|
||||
perform_delegate_email_batches(entry.id, self.course.id, task_input, "action_name")
|
||||
perform_delegate_email_batches(entry.id, self.course.id, task_input, "action_name") # pylint: disable=E1101
|
||||
((log_str, opt_str), _) = mock_log.error.call_args
|
||||
self.assertTrue(mock_log.error.called)
|
||||
self.assertIn('Unexpected bulk email TO_OPTION found', log_str)
|
||||
|
||||
162
lms/djangoapps/bulk_email/tests/test_tasks.py
Normal file
162
lms/djangoapps/bulk_email/tests/test_tasks.py
Normal file
@@ -0,0 +1,162 @@
|
||||
"""
|
||||
Unit tests for LMS instructor-initiated background tasks.
|
||||
|
||||
Runs tasks on answers to course problems to validate that code
|
||||
paths actually work.
|
||||
|
||||
"""
|
||||
import json
|
||||
from uuid import uuid4
|
||||
from itertools import cycle
|
||||
from mock import patch, Mock
|
||||
from smtplib import SMTPDataError, SMTPServerDisconnected
|
||||
|
||||
from celery.states import SUCCESS
|
||||
|
||||
# from django.test.utils import override_settings
|
||||
from django.conf import settings
|
||||
from django.core.management import call_command
|
||||
|
||||
from bulk_email.models import CourseEmail, SEND_TO_ALL
|
||||
|
||||
# from instructor_task.tests.test_tasks import TestInstructorTasks
|
||||
from instructor_task.tasks import send_bulk_course_email
|
||||
from instructor_task.models import InstructorTask
|
||||
from instructor_task.tests.test_base import InstructorTaskCourseTestCase
|
||||
from instructor_task.tests.factories import InstructorTaskFactory
|
||||
|
||||
|
||||
class TestTaskFailure(Exception):
|
||||
"""Dummy exception used for unit tests."""
|
||||
pass
|
||||
|
||||
|
||||
class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
|
||||
"""Tests instructor task that send bulk email."""
|
||||
|
||||
def setUp(self):
|
||||
super(TestBulkEmailInstructorTask, self).setUp()
|
||||
self.initialize_course()
|
||||
self.instructor = self.create_instructor('instructor')
|
||||
|
||||
# load initial content (since we don't run migrations as part of tests):
|
||||
call_command("loaddata", "course_email_template.json")
|
||||
|
||||
def _create_input_entry(self, course_id=None):
|
||||
"""
|
||||
Creates a InstructorTask entry for testing.
|
||||
|
||||
Overrides the base class version in that this creates CourseEmail.
|
||||
"""
|
||||
to_option = SEND_TO_ALL
|
||||
course_id = course_id or self.course.id
|
||||
course_email = CourseEmail.create(course_id, self.instructor, to_option, "Test Subject", "<p>This is a test message</p>")
|
||||
task_input = {'email_id': course_email.id}
|
||||
task_id = str(uuid4())
|
||||
instructor_task = InstructorTaskFactory.create(
|
||||
course_id=course_id,
|
||||
requester=self.instructor,
|
||||
task_input=json.dumps(task_input),
|
||||
task_key='dummy value',
|
||||
task_id=task_id,
|
||||
)
|
||||
return instructor_task
|
||||
|
||||
def _run_task_with_mock_celery(self, task_class, entry_id, task_id, expected_failure_message=None):
|
||||
"""Submit a task and mock how celery provides a current_task."""
|
||||
self.current_task = Mock()
|
||||
self.current_task.max_retries = settings.BULK_EMAIL_MAX_RETRIES
|
||||
self.current_task.default_retry_delay = settings.BULK_EMAIL_DEFAULT_RETRY_DELAY
|
||||
task_args = [entry_id, {}]
|
||||
|
||||
with patch('bulk_email.tasks._get_current_task') as mock_get_task:
|
||||
mock_get_task.return_value = self.current_task
|
||||
return task_class.apply(task_args, task_id=task_id).get()
|
||||
|
||||
def test_email_missing_current_task(self):
|
||||
task_entry = self._create_input_entry()
|
||||
with self.assertRaises(ValueError):
|
||||
send_bulk_course_email(task_entry.id, {})
|
||||
|
||||
def test_email_undefined_course(self):
|
||||
# Check that we fail when passing in a course that doesn't exist.
|
||||
task_entry = self._create_input_entry(course_id="bogus/course/id")
|
||||
with self.assertRaises(ValueError):
|
||||
self._run_task_with_mock_celery(send_bulk_course_email, task_entry.id, task_entry.task_id)
|
||||
|
||||
def _create_students(self, num_students):
|
||||
"""Create students, a problem, and StudentModule objects for testing"""
|
||||
students = [
|
||||
self.create_student('robot%d' % i) for i in xrange(num_students)
|
||||
]
|
||||
return students
|
||||
|
||||
def _test_run_with_task(self, task_class, action_name, total, succeeded, failed=0, skipped=0):
|
||||
"""Run a task and check the number of emails processed."""
|
||||
task_entry = self._create_input_entry()
|
||||
parent_status = self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id)
|
||||
# check return value
|
||||
self.assertEquals(parent_status.get('total'), total)
|
||||
self.assertEquals(parent_status.get('action_name'), action_name)
|
||||
# compare with entry in table:
|
||||
entry = InstructorTask.objects.get(id=task_entry.id)
|
||||
status = json.loads(entry.task_output)
|
||||
self.assertEquals(status.get('attempted'), succeeded + failed)
|
||||
self.assertEquals(status.get('succeeded'), succeeded)
|
||||
self.assertEquals(status['skipped'], skipped)
|
||||
self.assertEquals(status['failed'], failed)
|
||||
self.assertEquals(status.get('total'), total)
|
||||
self.assertEquals(status.get('action_name'), action_name)
|
||||
self.assertGreater(status.get('duration_ms'), 0)
|
||||
self.assertEquals(entry.task_state, SUCCESS)
|
||||
|
||||
def test_successful(self):
|
||||
num_students = settings.EMAILS_PER_TASK
|
||||
self._create_students(num_students)
|
||||
# we also send email to the instructor:
|
||||
num_emails = num_students + 1
|
||||
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
|
||||
get_conn.return_value.send_messages.side_effect = cycle([None])
|
||||
self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, num_emails)
|
||||
|
||||
def test_data_err_fail(self):
|
||||
# Test that celery handles permanent SMTPDataErrors by failing and not retrying.
|
||||
num_students = settings.EMAILS_PER_TASK
|
||||
self._create_students(num_students)
|
||||
# we also send email to the instructor:
|
||||
num_emails = num_students + 1
|
||||
expected_fails = int((num_emails + 3) / 4.0)
|
||||
expected_succeeds = num_emails - expected_fails
|
||||
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
|
||||
# have every fourth email fail due to blacklisting:
|
||||
get_conn.return_value.send_messages.side_effect = cycle([SMTPDataError(554, "Email address is blacklisted"),
|
||||
None, None, None])
|
||||
self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, expected_succeeds, failed=expected_fails)
|
||||
|
||||
def test_retry_after_limited_retry_error(self):
|
||||
# Test that celery handles connection failures by retrying.
|
||||
num_students = 1
|
||||
self._create_students(num_students)
|
||||
# we also send email to the instructor:
|
||||
num_emails = num_students + 1
|
||||
expected_fails = 0
|
||||
expected_succeeds = num_emails
|
||||
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
|
||||
# have every other mail attempt fail due to disconnection:
|
||||
get_conn.return_value.send_messages.side_effect = cycle([SMTPServerDisconnected(425, "Disconnecting"), None])
|
||||
self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, expected_succeeds, failed=expected_fails)
|
||||
|
||||
def test_max_retry(self):
|
||||
# Test that celery can hit a maximum number of retries.
|
||||
num_students = 1
|
||||
self._create_students(num_students)
|
||||
# we also send email to the instructor:
|
||||
num_emails = num_students + 1
|
||||
# This is an ugly hack: the failures that are reported by the EAGER version of retry
|
||||
# are multiplied by the attempted number of retries (equals max plus one).
|
||||
expected_fails = num_emails * (settings.BULK_EMAIL_MAX_RETRIES + 1)
|
||||
expected_succeeds = 0
|
||||
with patch('bulk_email.tasks.get_connection', autospec=True) as get_conn:
|
||||
# always fail to connect, triggering repeated retries until limit is hit:
|
||||
get_conn.return_value.send_messages.side_effect = cycle([SMTPServerDisconnected(425, "Disconnecting")])
|
||||
self._test_run_with_task(send_bulk_course_email, 'emailed', num_emails, expected_succeeds, failed=expected_fails)
|
||||
@@ -720,18 +720,13 @@ def instructor_dashboard(request, course_id):
|
||||
email_subject = request.POST.get("subject")
|
||||
html_message = request.POST.get("message")
|
||||
|
||||
# TODO: make sure this is committed before submitting it to the task.
|
||||
# However, it should probably be enough to do the submit below, which
|
||||
# will commit the transaction for the InstructorTask object. Both should
|
||||
# therefore be committed. (Still, it might be clearer to do so here as well.)
|
||||
# Actually, this should probably be moved out, so that all the validation logic
|
||||
# we might want to add to it can be added. There might also be something
|
||||
# that would permit validation of the email beforehand.
|
||||
# Create the CourseEmail object. This is saved immediately, so that
|
||||
# any transaction that has been pending up to this point will also be
|
||||
# committed.
|
||||
email = CourseEmail.create(course_id, request.user, email_to_option, email_subject, html_message)
|
||||
|
||||
# TODO: make this into a task submission, so that the correct
|
||||
# InstructorTask object gets created (for monitoring purposes)
|
||||
submit_bulk_course_email(request, course_id, email.id)
|
||||
# Submit the task, so that the correct InstructorTask object gets created (for monitoring purposes)
|
||||
submit_bulk_course_email(request, course_id, email.id) # pylint: disable=E1101
|
||||
|
||||
if email_to_option == "all":
|
||||
email_msg = '<div class="msg msg-confirm"><p class="copy">Your email was successfully queued for sending. Please note that for large public classes (~10k), it may take 1-2 hours to send all emails.</p></div>'
|
||||
@@ -1535,7 +1530,6 @@ def get_background_task_table(course_id, problem_url=None, student=None, task_ty
|
||||
# (note that we don't have to check that the arguments are valid; it
|
||||
# just won't find any entries.)
|
||||
if (history_entries.count()) == 0:
|
||||
# TODO: figure out how to deal with task_type better here...
|
||||
if problem_url is None:
|
||||
msg += '<font color="red">Failed to find any background tasks for course "{course}".</font>'.format(course=course_id)
|
||||
elif student is not None:
|
||||
|
||||
@@ -178,8 +178,8 @@ def submit_bulk_course_email(request, course_id, email_id):
|
||||
The specified CourseEmail object will be sent be updated for all students who have enrolled
|
||||
in a course. Parameters are the `course_id` and the `email_id`, the id of the CourseEmail object.
|
||||
|
||||
AlreadyRunningError is raised if the course's students are already being emailed.
|
||||
TODO: is this the right behavior? Or should multiple emails be allowed in the pipeline at the same time?
|
||||
AlreadyRunningError is raised if the same recipients are already being emailed with the same
|
||||
CourseEmail object.
|
||||
|
||||
This method makes sure the InstructorTask entry is committed.
|
||||
When called from any view that is wrapped by TransactionMiddleware,
|
||||
@@ -188,11 +188,9 @@ def submit_bulk_course_email(request, course_id, email_id):
|
||||
save here. Any future database operations will take place in a
|
||||
separate transaction.
|
||||
"""
|
||||
# check arguments: make sure that the course is defined?
|
||||
# TODO: what is the right test here?
|
||||
|
||||
# This should also make sure that the email exists.
|
||||
# We can also pull out the To argument here, so that is displayed in
|
||||
# Assume that the course is defined, and that the user has already been verified to have
|
||||
# appropriate access to the course. But make sure that the email exists.
|
||||
# We also pull out the To argument here, so that is displayed in
|
||||
# the InstructorTask status.
|
||||
email_obj = CourseEmail.objects.get(id=email_id)
|
||||
to_option = email_obj.to_option
|
||||
|
||||
@@ -268,7 +268,7 @@ def submit_task(request, task_type, task_class, course_id, task_input, task_key)
|
||||
|
||||
# submit task:
|
||||
task_id = instructor_task.task_id
|
||||
task_args = [instructor_task.id, _get_xmodule_instance_args(request, task_id)]
|
||||
task_args = [instructor_task.id, _get_xmodule_instance_args(request, task_id)] # pylint: disable=E1101
|
||||
task_class.apply_async(task_args, task_id=task_id)
|
||||
|
||||
return instructor_task
|
||||
|
||||
@@ -14,50 +14,75 @@ from instructor_task.models import InstructorTask, PROGRESS, QUEUING
|
||||
TASK_LOG = get_task_logger(__name__)
|
||||
|
||||
|
||||
def create_subtask_status(succeeded=0, failed=0, pending=0, skipped=0, retriedA=0, retriedB=0, state=None):
|
||||
def create_subtask_status(task_id, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):
|
||||
"""
|
||||
Create a dict for tracking the status of a subtask.
|
||||
Create and return a dict for tracking the status of a subtask.
|
||||
|
||||
Subtask status keys are:
|
||||
|
||||
'task_id' : id of subtask. This is used to pass task information across retries.
|
||||
'attempted' : number of attempts -- should equal succeeded plus failed
|
||||
'succeeded' : number that succeeded in processing
|
||||
'skipped' : number that were not processed.
|
||||
'failed' : number that failed during processing
|
||||
'retried_nomax' : number of times the subtask has been retried for conditions that
|
||||
should not have a maximum count applied
|
||||
'retried_withmax' : number of times the subtask has been retried for conditions that
|
||||
should have a maximum count applied
|
||||
'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
|
||||
|
||||
Keys are: 'attempted', 'succeeded', 'skipped', 'failed', 'retried'.
|
||||
TODO: update
|
||||
Object must be JSON-serializable, so that it can be passed as an argument
|
||||
to tasks.
|
||||
|
||||
TODO: decide if in future we want to include specific error information
|
||||
In future, we may want to include specific error information
|
||||
indicating the reason for failure.
|
||||
Also, we should count up "not attempted" separately from
|
||||
attempted/failed.
|
||||
Also, we should count up "not attempted" separately from attempted/failed.
|
||||
"""
|
||||
attempted = succeeded + failed
|
||||
current_result = {
|
||||
'task_id': task_id,
|
||||
'attempted': attempted,
|
||||
'succeeded': succeeded,
|
||||
'pending': pending,
|
||||
'skipped': skipped,
|
||||
'failed': failed,
|
||||
'retriedA': retriedA,
|
||||
'retriedB': retriedB,
|
||||
'retried_nomax': retried_nomax,
|
||||
'retried_withmax': retried_withmax,
|
||||
'state': state if state is not None else QUEUING,
|
||||
}
|
||||
return current_result
|
||||
|
||||
|
||||
def increment_subtask_status(subtask_result, succeeded=0, failed=0, pending=0, skipped=0, retriedA=0, retriedB=0, state=None):
|
||||
def increment_subtask_status(subtask_result, succeeded=0, failed=0, skipped=0, retried_nomax=0, retried_withmax=0, state=None):
|
||||
"""
|
||||
Update the result of a subtask with additional results.
|
||||
|
||||
Keys are: 'attempted', 'succeeded', 'skipped', 'failed', 'retried'.
|
||||
Create and return a dict for tracking the status of a subtask.
|
||||
|
||||
Keys for input `subtask_result` and returned subtask_status are:
|
||||
|
||||
'task_id' : id of subtask. This is used to pass task information across retries.
|
||||
'attempted' : number of attempts -- should equal succeeded plus failed
|
||||
'succeeded' : number that succeeded in processing
|
||||
'skipped' : number that were not processed.
|
||||
'failed' : number that failed during processing
|
||||
'retried_nomax' : number of times the subtask has been retried for conditions that
|
||||
should not have a maximum count applied
|
||||
'retried_withmax' : number of times the subtask has been retried for conditions that
|
||||
should have a maximum count applied
|
||||
'state' : celery state of the subtask (e.g. QUEUING, PROGRESS, RETRY, FAILURE, SUCCESS)
|
||||
|
||||
Kwarg arguments are incremented to the corresponding key in `subtask_result`.
|
||||
The exception is for `state`, which if specified is used to override the existing value.
|
||||
"""
|
||||
# TODO: rewrite this if we have additional fields added to original subtask_result,
|
||||
# that are not part of the increment. Tradeoff on duplicating the 'attempts' logic.
|
||||
new_result = create_subtask_status(succeeded, failed, pending, skipped, retriedA, retriedB, state)
|
||||
for keyname in new_result:
|
||||
if keyname == 'state':
|
||||
# does not get incremented. If no new value, copy old value:
|
||||
if state is None:
|
||||
new_result[keyname] = subtask_result[keyname]
|
||||
elif keyname in subtask_result:
|
||||
new_result[keyname] += subtask_result[keyname]
|
||||
new_result = dict(subtask_result)
|
||||
new_result['attempted'] += (succeeded + failed)
|
||||
new_result['succeeded'] += succeeded
|
||||
new_result['failed'] += failed
|
||||
new_result['skipped'] += skipped
|
||||
new_result['retried_nomax'] += retried_nomax
|
||||
new_result['retried_withmax'] += retried_withmax
|
||||
if state is not None:
|
||||
new_result['state'] = state
|
||||
|
||||
return new_result
|
||||
|
||||
@@ -70,7 +95,7 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
|
||||
Counters for 'attempted', 'succeeded', 'failed', 'skipped' keys are initialized to zero,
|
||||
as is the 'duration_ms' value. A 'start_time' is stored for later duration calculations,
|
||||
and the total number of "things to do" is set, so the user can be told how much needs to be
|
||||
done overall. The `action_name` is also stored, to also help with constructing more readable
|
||||
done overall. The `action_name` is also stored, to help with constructing more readable
|
||||
task_progress messages.
|
||||
|
||||
The InstructorTask's "subtasks" field is also initialized. This is also a JSON-serialized dict.
|
||||
@@ -80,8 +105,8 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
|
||||
the InstructorTask's "status" will be changed to SUCCESS.
|
||||
|
||||
The "subtasks" field also contains a 'status' key, that contains a dict that stores status
|
||||
information for each subtask. At the moment, the value for each subtask (keyed by its task_id)
|
||||
is the value of `status`, which is initialized here to QUEUING.
|
||||
information for each subtask. The value for each subtask (keyed by its task_id)
|
||||
is its subtask status, as defined by create_subtask_status().
|
||||
|
||||
This information needs to be set up in the InstructorTask before any of the subtasks start
|
||||
running. If not, there is a chance that the subtasks could complete before the parent task
|
||||
@@ -92,7 +117,6 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
|
||||
rely on the status stored in the InstructorTask object, rather than status stored in the
|
||||
corresponding AsyncResult.
|
||||
"""
|
||||
# TODO: also add 'pending' count here? (Even though it's total-attempted-skipped
|
||||
task_progress = {
|
||||
'action_name': action_name,
|
||||
'attempted': 0,
|
||||
@@ -108,12 +132,8 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
|
||||
|
||||
# Write out the subtasks information.
|
||||
num_subtasks = len(subtask_id_list)
|
||||
# using fromkeys to initialize uses a single value. we need the value
|
||||
# to be distinct, since it's now a dict:
|
||||
# subtask_status = dict.fromkeys(subtask_id_list, QUEUING)
|
||||
# TODO: may not be necessary to store initial value with all those zeroes!
|
||||
# Instead, use a placemarker....
|
||||
subtask_status = {subtask_id: create_subtask_status() for subtask_id in subtask_id_list}
|
||||
# Note that may not be necessary to store initial value with all those zeroes!
|
||||
subtask_status = {subtask_id: create_subtask_status(subtask_id) for subtask_id in subtask_id_list}
|
||||
subtask_dict = {
|
||||
'total': num_subtasks,
|
||||
'succeeded': 0,
|
||||
@@ -129,7 +149,7 @@ def update_instructor_task_for_subtasks(entry, action_name, total_num, subtask_i
|
||||
|
||||
|
||||
@transaction.commit_manually
|
||||
def update_subtask_status(entry_id, current_task_id, subtask_status):
|
||||
def update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
"""
|
||||
Update the status of the subtask in the parent InstructorTask object tracking its progress.
|
||||
|
||||
@@ -138,9 +158,11 @@ def update_subtask_status(entry_id, current_task_id, subtask_status):
|
||||
committed on completion, or rolled back on error.
|
||||
|
||||
The InstructorTask's "task_output" field is updated. This is a JSON-serialized dict.
|
||||
Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `subtask_progress`
|
||||
Accumulates values for 'attempted', 'succeeded', 'failed', 'skipped' from `new_subtask_status`
|
||||
into the corresponding values in the InstructorTask's task_output. Also updates the 'duration_ms'
|
||||
value with the current interval since the original InstructorTask started.
|
||||
value with the current interval since the original InstructorTask started. Note that this
|
||||
value is only approximate, since the subtask may be running on a different server than the
|
||||
original task, so is subject to clock skew.
|
||||
|
||||
The InstructorTask's "subtasks" field is also updated. This is also a JSON-serialized dict.
|
||||
Keys include 'total', 'succeeded', 'retried', 'failed', which are counters for the number of
|
||||
@@ -155,13 +177,13 @@ def update_subtask_status(entry_id, current_task_id, subtask_status):
|
||||
messages, progress made, etc.
|
||||
"""
|
||||
TASK_LOG.info("Preparing to update status for email subtask %s for instructor task %d with status %s",
|
||||
current_task_id, entry_id, subtask_status)
|
||||
current_task_id, entry_id, new_subtask_status)
|
||||
|
||||
try:
|
||||
entry = InstructorTask.objects.select_for_update().get(pk=entry_id)
|
||||
subtask_dict = json.loads(entry.subtasks)
|
||||
subtask_status = subtask_dict['status']
|
||||
if current_task_id not in subtask_status:
|
||||
subtask_status_info = subtask_dict['status']
|
||||
if current_task_id not in subtask_status_info:
|
||||
# unexpected error -- raise an exception
|
||||
format_str = "Unexpected task_id '{}': unable to update status for email subtask of instructor task '{}'"
|
||||
msg = format_str.format(current_task_id, entry_id)
|
||||
@@ -173,39 +195,45 @@ def update_subtask_status(entry_id, current_task_id, subtask_status):
|
||||
# will be updating before the original call, and we don't want their
|
||||
# ultimate status to be clobbered by the "earlier" updates. This
|
||||
# should not be a problem in normal (non-eager) processing.
|
||||
old_status = subtask_status[current_task_id]
|
||||
# TODO: check this logic...
|
||||
state = subtask_status['state']
|
||||
# if state != RETRY or old_status['status'] == QUEUING:
|
||||
# instead replace the status only if it's 'newer'
|
||||
# i.e. has fewer pending
|
||||
if subtask_status['pending'] <= old_status['pending']:
|
||||
subtask_status[current_task_id] = subtask_status
|
||||
current_subtask_status = subtask_status_info[current_task_id]
|
||||
current_state = current_subtask_status['state']
|
||||
new_state = new_subtask_status['state']
|
||||
if new_state != RETRY or current_state == QUEUING or current_state in READY_STATES:
|
||||
subtask_status_info[current_task_id] = new_subtask_status
|
||||
|
||||
# Update the parent task progress
|
||||
# Set the estimate of duration, but only if it
|
||||
# increases. Clock skew between time() returned by different machines
|
||||
# may result in non-monotonic values for duration.
|
||||
task_progress = json.loads(entry.task_output)
|
||||
start_time = task_progress['start_time']
|
||||
task_progress['duration_ms'] = int((time() - start_time) * 1000)
|
||||
# change behavior so we don't update on progress now:
|
||||
# TODO: figure out if we can make this more responsive later,
|
||||
# by figuring out how to handle retries better.
|
||||
if subtask_status is not None and state in READY_STATES:
|
||||
prev_duration = task_progress['duration_ms']
|
||||
new_duration = int((time() - start_time) * 1000)
|
||||
task_progress['duration_ms'] = max(prev_duration, new_duration)
|
||||
|
||||
# Update counts only when subtask is done.
|
||||
# In future, we can make this more responsive by updating status
|
||||
# between retries, by comparing counts that change from previous
|
||||
# retry.
|
||||
if new_subtask_status is not None and new_state in READY_STATES:
|
||||
for statname in ['attempted', 'succeeded', 'failed', 'skipped']:
|
||||
task_progress[statname] += subtask_status[statname]
|
||||
task_progress[statname] += new_subtask_status[statname]
|
||||
|
||||
# Figure out if we're actually done (i.e. this is the last task to complete).
|
||||
# This is easier if we just maintain a counter, rather than scanning the
|
||||
# entire subtask_status dict.
|
||||
if state == SUCCESS:
|
||||
# entire new_subtask_status dict.
|
||||
if new_state == SUCCESS:
|
||||
subtask_dict['succeeded'] += 1
|
||||
elif state == RETRY:
|
||||
elif new_state == RETRY:
|
||||
subtask_dict['retried'] += 1
|
||||
else:
|
||||
subtask_dict['failed'] += 1
|
||||
num_remaining = subtask_dict['total'] - subtask_dict['succeeded'] - subtask_dict['failed']
|
||||
# If we're done with the last task, update the parent status to indicate that:
|
||||
# TODO: see if there was a catastrophic failure that occurred, and figure out
|
||||
# how to report that here.
|
||||
|
||||
# If we're done with the last task, update the parent status to indicate that.
|
||||
# At present, we mark the task as having succeeded. In future, we should see
|
||||
# if there was a catastrophic failure that occurred, and figure out how to
|
||||
# report that here.
|
||||
if num_remaining <= 0:
|
||||
entry.task_state = SUCCESS
|
||||
entry.subtasks = json.dumps(subtask_dict)
|
||||
|
||||
@@ -32,7 +32,7 @@ from instructor_task.tasks_helper import (
|
||||
from bulk_email.tasks import perform_delegate_email_batches
|
||||
|
||||
|
||||
@task(base=BaseInstructorTask)
|
||||
@task(base=BaseInstructorTask) # pylint: disable=E1102
|
||||
def rescore_problem(entry_id, xmodule_instance_args):
|
||||
"""Rescores a problem in a course, for all students or one specific student.
|
||||
|
||||
@@ -55,13 +55,14 @@ def rescore_problem(entry_id, xmodule_instance_args):
|
||||
update_fcn = partial(rescore_problem_module_state, xmodule_instance_args)
|
||||
|
||||
def filter_fcn(modules_to_update):
|
||||
"""Filter that matches problems which are marked as being done"""
|
||||
return modules_to_update.filter(state__contains='"done": true')
|
||||
|
||||
visit_fcn = partial(perform_module_state_update, update_fcn, filter_fcn)
|
||||
return run_main_task(entry_id, visit_fcn, action_name)
|
||||
|
||||
|
||||
@task(base=BaseInstructorTask)
|
||||
@task(base=BaseInstructorTask) # pylint: disable=E1102
|
||||
def reset_problem_attempts(entry_id, xmodule_instance_args):
|
||||
"""Resets problem attempts to zero for a particular problem for all students in a course.
|
||||
|
||||
@@ -82,7 +83,7 @@ def reset_problem_attempts(entry_id, xmodule_instance_args):
|
||||
return run_main_task(entry_id, visit_fcn, action_name)
|
||||
|
||||
|
||||
@task(base=BaseInstructorTask)
|
||||
@task(base=BaseInstructorTask) # pylint: disable=E1102
|
||||
def delete_problem_state(entry_id, xmodule_instance_args):
|
||||
"""Deletes problem state entirely for all students on a particular problem in a course.
|
||||
|
||||
@@ -103,18 +104,20 @@ def delete_problem_state(entry_id, xmodule_instance_args):
|
||||
return run_main_task(entry_id, visit_fcn, action_name)
|
||||
|
||||
|
||||
@task(base=BaseInstructorTask)
|
||||
def send_bulk_course_email(entry_id, xmodule_instance_args):
|
||||
"""Sends emails to in a course.
|
||||
@task(base=BaseInstructorTask) # pylint: disable=E1102
|
||||
def send_bulk_course_email(entry_id, _xmodule_instance_args):
|
||||
"""Sends emails to recipients enrolled in a course.
|
||||
|
||||
`entry_id` is the id value of the InstructorTask entry that corresponds to this task.
|
||||
The entry contains the `course_id` that identifies the course, as well as the
|
||||
`task_input`, which contains task-specific input.
|
||||
|
||||
The task_input should be a dict with no entries.
|
||||
The task_input should be a dict with the following entries:
|
||||
|
||||
`xmodule_instance_args` provides information needed by _get_module_instance_for_task()
|
||||
to instantiate an xmodule instance.
|
||||
'email_id': the full URL to the problem to be rescored. (required)
|
||||
|
||||
`_xmodule_instance_args` provides information needed by _get_module_instance_for_task()
|
||||
to instantiate an xmodule instance. This is unused here.
|
||||
"""
|
||||
action_name = 'emailed'
|
||||
visit_fcn = perform_delegate_email_batches
|
||||
|
||||
@@ -42,6 +42,12 @@ class BaseInstructorTask(Task):
|
||||
Permits updating information about task in corresponding InstructorTask for monitoring purposes.
|
||||
|
||||
Assumes that the entry_id of the InstructorTask model is the first argument to the task.
|
||||
|
||||
The `entry_id` is the primary key for the InstructorTask entry representing the task. This class
|
||||
updates the entry on success and failure of the task it wraps. It is setting the entry's value
|
||||
for task_state based on what Celery would set it to once the task returns to Celery:
|
||||
FAILURE if an exception is encountered, and SUCCESS if it returns normally.
|
||||
Other arguments are pass-throughs to perform_module_state_update, and documented there.
|
||||
"""
|
||||
abstract = True
|
||||
|
||||
@@ -51,8 +57,22 @@ class BaseInstructorTask(Task):
|
||||
|
||||
Updates task_output and task_state. But it shouldn't actually do anything
|
||||
if the task is only creating subtasks to actually do the work.
|
||||
|
||||
Assumes `task_progress` is a dict containing the task's result, with the following keys:
|
||||
|
||||
'attempted': number of attempts made
|
||||
'succeeded': number of attempts that "succeeded"
|
||||
'skipped': number of attempts that "skipped"
|
||||
'failed': number of attempts that "failed"
|
||||
'total': number of possible subtasks to attempt
|
||||
'action_name': user-visible verb to use in status messages. Should be past-tense.
|
||||
Pass-through of input `action_name`.
|
||||
'duration_ms': how long the task has (or had) been running.
|
||||
|
||||
This is JSON-serialized and stored in the task_output column of the InstructorTask entry.
|
||||
|
||||
"""
|
||||
TASK_LOG.info('Task success returned: %r' % (self.request, ))
|
||||
TASK_LOG.debug('Task %s: success returned with progress: %s', task_id, task_progress)
|
||||
# We should be able to find the InstructorTask object to update
|
||||
# based on the task_id here, without having to dig into the
|
||||
# original args to the task. On the other hand, the entry_id
|
||||
@@ -72,9 +92,20 @@ class BaseInstructorTask(Task):
|
||||
"""
|
||||
Update InstructorTask object corresponding to this task with info about failure.
|
||||
|
||||
Fetches and updates exception and traceback information on failure.
|
||||
Fetches and updates exception and traceback information on failure.
|
||||
|
||||
If an exception is raised internal to the task, it is caught by celery and provided here.
|
||||
The information is recorded in the InstructorTask object as a JSON-serialized dict
|
||||
stored in the task_output column. It contains the following keys:
|
||||
|
||||
'exception': type of exception object
|
||||
'message': error message from exception object
|
||||
'traceback': traceback information (truncated if necessary)
|
||||
|
||||
Note that there is no way to record progress made within the task (e.g. attempted,
|
||||
succeeded, etc.) when such failures occur.
|
||||
"""
|
||||
TASK_LOG.info('Task failure returned: %r' % (self.request, ))
|
||||
TASK_LOG.debug('Task %s: failure returned', task_id)
|
||||
entry_id = args[0]
|
||||
try:
|
||||
entry = InstructorTask.objects.get(pk=entry_id)
|
||||
@@ -88,12 +119,6 @@ class BaseInstructorTask(Task):
|
||||
entry.task_state = FAILURE
|
||||
entry.save_now()
|
||||
|
||||
def on_retry(self, exc, task_id, args, kwargs, einfo):
|
||||
# We don't expect this to be called for top-level tasks, at the moment....
|
||||
# If it were, not sure what kind of status to report for it.
|
||||
# But it would be good to know that it's being called, so at least log it.
|
||||
TASK_LOG.info('Task retry returned: %r' % (self.request, ))
|
||||
|
||||
|
||||
class UpdateProblemModuleStateError(Exception):
|
||||
"""
|
||||
@@ -110,6 +135,67 @@ def _get_current_task():
|
||||
return current_task
|
||||
|
||||
|
||||
def run_main_task(entry_id, task_fcn, action_name):
|
||||
"""
|
||||
Applies the `task_fcn` to the arguments defined in `entry_id` InstructorTask.
|
||||
|
||||
Arguments passed to `task_fcn` are:
|
||||
|
||||
`entry_id` : the primary key for the InstructorTask entry representing the task.
|
||||
`course_id` : the id for the course.
|
||||
`task_input` : dict containing task-specific arguments, JSON-decoded from InstructorTask's task_input.
|
||||
`action_name` : past-tense verb to use for constructing status messages.
|
||||
|
||||
If no exceptions are raised, the `task_fcn` should return a dict containing
|
||||
the task's result with the following keys:
|
||||
|
||||
'attempted': number of attempts made
|
||||
'succeeded': number of attempts that "succeeded"
|
||||
'skipped': number of attempts that "skipped"
|
||||
'failed': number of attempts that "failed"
|
||||
'total': number of possible subtasks to attempt
|
||||
'action_name': user-visible verb to use in status messages.
|
||||
Should be past-tense. Pass-through of input `action_name`.
|
||||
'duration_ms': how long the task has (or had) been running.
|
||||
|
||||
"""
|
||||
|
||||
# get the InstructorTask to be updated. If this fails, then let the exception return to Celery.
|
||||
# There's no point in catching it here.
|
||||
entry = InstructorTask.objects.get(pk=entry_id)
|
||||
|
||||
# get inputs to use in this task from the entry:
|
||||
task_id = entry.task_id
|
||||
course_id = entry.course_id
|
||||
task_input = json.loads(entry.task_input)
|
||||
|
||||
# construct log message:
|
||||
fmt = 'task "{task_id}": course "{course_id}" input "{task_input}"'
|
||||
task_info_string = fmt.format(task_id=task_id, course_id=course_id, task_input=task_input)
|
||||
|
||||
TASK_LOG.info('Starting update (nothing %s yet): %s', action_name, task_info_string)
|
||||
|
||||
# Check that the task_id submitted in the InstructorTask matches the current task
|
||||
# that is running.
|
||||
request_task_id = _get_current_task().request.id
|
||||
if task_id != request_task_id:
|
||||
fmt = 'Requested task did not match actual task "{actual_id}": {task_info}'
|
||||
message = fmt.format(actual_id=request_task_id, task_info=task_info_string)
|
||||
TASK_LOG.error(message)
|
||||
raise ValueError(message)
|
||||
|
||||
# Now do the work:
|
||||
with dog_stats_api.timer('instructor_tasks.time.overall', tags=['action:{name}'.format(name=action_name)]):
|
||||
task_progress = task_fcn(entry_id, course_id, task_input, action_name)
|
||||
|
||||
# Release any queries that the connection has been hanging onto:
|
||||
reset_queries()
|
||||
|
||||
# log and exit, returning task_progress info as task result:
|
||||
TASK_LOG.info('Finishing %s: final: %s', task_info_string, task_progress)
|
||||
return task_progress
|
||||
|
||||
|
||||
def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, task_input, action_name):
|
||||
"""
|
||||
Performs generic update by visiting StudentModule instances with the update_fcn provided.
|
||||
@@ -220,92 +306,13 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta
|
||||
return task_progress
|
||||
|
||||
|
||||
def run_main_task(entry_id, task_fcn, action_name):
|
||||
"""
|
||||
Applies the `task_fcn` to the arguments defined in `entry_id` InstructorTask.
|
||||
|
||||
TODO: UPDATE THIS DOCSTRING
|
||||
(IT's not just visiting StudentModule instances....)
|
||||
|
||||
Performs generic update by visiting StudentModule instances with the update_fcn provided.
|
||||
|
||||
The `entry_id` is the primary key for the InstructorTask entry representing the task. This function
|
||||
updates the entry on success and failure of the perform_module_state_update function it
|
||||
wraps. It is setting the entry's value for task_state based on what Celery would set it to once
|
||||
the task returns to Celery: FAILURE if an exception is encountered, and SUCCESS if it returns normally.
|
||||
Other arguments are pass-throughs to perform_module_state_update, and documented there.
|
||||
|
||||
If no exceptions are raised, a dict containing the task's result is returned, with the following keys:
|
||||
|
||||
'attempted': number of attempts made
|
||||
'succeeded': number of attempts that "succeeded"
|
||||
'skipped': number of attempts that "skipped"
|
||||
'failed': number of attempts that "failed"
|
||||
'total': number of possible subtasks to attempt
|
||||
'action_name': user-visible verb to use in status messages. Should be past-tense.
|
||||
Pass-through of input `action_name`.
|
||||
'duration_ms': how long the task has (or had) been running.
|
||||
|
||||
Before returning, this is also JSON-serialized and stored in the task_output column of the InstructorTask entry.
|
||||
|
||||
If an exception is raised internally, it is caught and recorded in the InstructorTask entry.
|
||||
This is also a JSON-serialized dict, stored in the task_output column, containing the following keys:
|
||||
|
||||
'exception': type of exception object
|
||||
'message': error message from exception object
|
||||
'traceback': traceback information (truncated if necessary)
|
||||
|
||||
Once the exception is caught, it is raised again and allowed to pass up to the
|
||||
task-running level, so that it can also set the failure modes and capture the error trace in the
|
||||
result object that Celery creates.
|
||||
|
||||
"""
|
||||
|
||||
# get the InstructorTask to be updated. If this fails, then let the exception return to Celery.
|
||||
# There's no point in catching it here.
|
||||
entry = InstructorTask.objects.get(pk=entry_id)
|
||||
|
||||
# get inputs to use in this task from the entry:
|
||||
task_id = entry.task_id
|
||||
course_id = entry.course_id
|
||||
task_input = json.loads(entry.task_input)
|
||||
|
||||
# construct log message:
|
||||
fmt = 'task "{task_id}": course "{course_id}" input "{task_input}"'
|
||||
task_info_string = fmt.format(task_id=task_id, course_id=course_id, task_input=task_input)
|
||||
|
||||
TASK_LOG.info('Starting update (nothing %s yet): %s', action_name, task_info_string)
|
||||
|
||||
# Check that the task_id submitted in the InstructorTask matches the current task
|
||||
# that is running.
|
||||
request_task_id = _get_current_task().request.id
|
||||
if task_id != request_task_id:
|
||||
fmt = 'Requested task did not match actual task "{actual_id}": {task_info}'
|
||||
message = fmt.format(actual_id=request_task_id, task_info=task_info_string)
|
||||
TASK_LOG.error(message)
|
||||
raise UpdateProblemModuleStateError(message)
|
||||
|
||||
# Now do the work:
|
||||
with dog_stats_api.timer('instructor_tasks.time.overall', tags=['action:{name}'.format(name=action_name)]):
|
||||
task_progress = task_fcn(entry_id, course_id, task_input, action_name)
|
||||
|
||||
# Release any queries that the connection has been hanging onto:
|
||||
reset_queries()
|
||||
|
||||
# log and exit, returning task_progress info as task result:
|
||||
TASK_LOG.info('Finishing %s: final: %s', task_info_string, task_progress)
|
||||
return task_progress
|
||||
|
||||
|
||||
def _get_task_id_from_xmodule_args(xmodule_instance_args):
|
||||
"""Gets task_id from `xmodule_instance_args` dict, or returns default value if missing."""
|
||||
return xmodule_instance_args.get('task_id', UNKNOWN_TASK_ID) if xmodule_instance_args is not None else UNKNOWN_TASK_ID
|
||||
|
||||
|
||||
def _get_xqueue_callback_url_prefix(xmodule_instance_args):
|
||||
"""
|
||||
|
||||
"""
|
||||
"""Gets prefix to use when constructing xqueue_callback_url."""
|
||||
return xmodule_instance_args.get('xqueue_callback_url_prefix', '') if xmodule_instance_args is not None else ''
|
||||
|
||||
|
||||
|
||||
@@ -152,15 +152,16 @@ class InstructorTaskCourseSubmitTest(InstructorTaskCourseTestCase):
|
||||
self.instructor = UserFactory.create(username="instructor", email="instructor@edx.org")
|
||||
|
||||
def _define_course_email(self):
|
||||
"""Create CourseEmail object for testing."""
|
||||
course_email = CourseEmail.create(self.course.id, self.instructor, SEND_TO_ALL, "Test Subject", "<p>This is a test message</p>")
|
||||
return course_email.id
|
||||
return course_email.id # pylint: disable=E1101
|
||||
|
||||
def test_submit_bulk_email_all(self):
|
||||
email_id = self._define_course_email()
|
||||
instructor_task = submit_bulk_course_email(self.create_task_request(self.instructor), self.course.id, email_id)
|
||||
|
||||
# test resubmitting, by updating the existing record:
|
||||
instructor_task = InstructorTask.objects.get(id=instructor_task.id)
|
||||
instructor_task = InstructorTask.objects.get(id=instructor_task.id) # pylint: disable=E1101
|
||||
instructor_task.task_state = PROGRESS
|
||||
instructor_task.save()
|
||||
|
||||
|
||||
@@ -85,11 +85,11 @@ class TestInstructorTasks(InstructorTaskModuleTestCase):
|
||||
def _test_missing_current_task(self, task_class):
|
||||
"""Check that a task_class fails when celery doesn't provide a current_task."""
|
||||
task_entry = self._create_input_entry()
|
||||
with self.assertRaises(UpdateProblemModuleStateError):
|
||||
with self.assertRaises(ValueError):
|
||||
task_class(task_entry.id, self._get_xmodule_instance_args())
|
||||
|
||||
def _test_undefined_course(self, task_class):
|
||||
# run with celery, but no course defined
|
||||
"""Run with celery, but with no course defined."""
|
||||
task_entry = self._create_input_entry(course_id="bogus/course/id")
|
||||
with self.assertRaises(ItemNotFoundError):
|
||||
self._run_task_with_mock_celery(task_class, task_entry.id, task_entry.task_id)
|
||||
|
||||
@@ -93,6 +93,10 @@ CELERY_QUEUES = {
|
||||
DEFAULT_PRIORITY_QUEUE: {}
|
||||
}
|
||||
|
||||
# We want Bulk Email running on the high-priority queue, so we define the
|
||||
# routing key that points to it. At the moment, the name is the same.
|
||||
BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE
|
||||
|
||||
########################## NON-SECURE ENV CONFIG ##############################
|
||||
# Things like server locations, ports, etc.
|
||||
|
||||
@@ -128,7 +132,7 @@ LOG_DIR = ENV_TOKENS['LOG_DIR']
|
||||
|
||||
CACHES = ENV_TOKENS['CACHES']
|
||||
|
||||
#Email overrides
|
||||
# Email overrides
|
||||
DEFAULT_FROM_EMAIL = ENV_TOKENS.get('DEFAULT_FROM_EMAIL', DEFAULT_FROM_EMAIL)
|
||||
DEFAULT_FEEDBACK_EMAIL = ENV_TOKENS.get('DEFAULT_FEEDBACK_EMAIL', DEFAULT_FEEDBACK_EMAIL)
|
||||
DEFAULT_BULK_FROM_EMAIL = ENV_TOKENS.get('DEFAULT_BULK_FROM_EMAIL', DEFAULT_BULK_FROM_EMAIL)
|
||||
@@ -140,8 +144,10 @@ BUGS_EMAIL = ENV_TOKENS.get('BUGS_EMAIL', BUGS_EMAIL)
|
||||
PAYMENT_SUPPORT_EMAIL = ENV_TOKENS.get('PAYMENT_SUPPORT_EMAIL', PAYMENT_SUPPORT_EMAIL)
|
||||
PAID_COURSE_REGISTRATION_CURRENCY = ENV_TOKENS.get('PAID_COURSE_REGISTRATION_CURRENCY',
|
||||
PAID_COURSE_REGISTRATION_CURRENCY)
|
||||
BULK_EMAIL_DEFAULT_RETRY_DELAY = ENV_TOKENS.get('BULK_EMAIL_DEFAULT_RETRY_DELAY', BULK_EMAIL_DEFAULT_RETRY_DELAY)
|
||||
BULK_EMAIL_MAX_RETRIES = ENV_TOKENS.get('BULK_EMAIL_MAX_RETRIES', BULK_EMAIL_MAX_RETRIES)
|
||||
|
||||
#Theme overrides
|
||||
# Theme overrides
|
||||
THEME_NAME = ENV_TOKENS.get('THEME_NAME', None)
|
||||
if not THEME_NAME is None:
|
||||
enable_theme(THEME_NAME)
|
||||
@@ -150,10 +156,10 @@ if not THEME_NAME is None:
|
||||
# Marketing link overrides
|
||||
MKTG_URL_LINK_MAP.update(ENV_TOKENS.get('MKTG_URL_LINK_MAP', {}))
|
||||
|
||||
#Timezone overrides
|
||||
# Timezone overrides
|
||||
TIME_ZONE = ENV_TOKENS.get('TIME_ZONE', TIME_ZONE)
|
||||
|
||||
#Additional installed apps
|
||||
# Additional installed apps
|
||||
for app in ENV_TOKENS.get('ADDL_INSTALLED_APPS', []):
|
||||
INSTALLED_APPS += (app,)
|
||||
|
||||
|
||||
@@ -795,6 +795,17 @@ CELERY_QUEUES = {
|
||||
DEFAULT_PRIORITY_QUEUE: {}
|
||||
}
|
||||
|
||||
# let logging work as configured:
|
||||
CELERYD_HIJACK_ROOT_LOGGER = False
|
||||
|
||||
################################ Bulk Email ###################################
|
||||
|
||||
# We want Bulk Email running on the high-priority queue, so we define the
|
||||
# routing key that points to it. At the moment, the name is the same.
|
||||
BULK_EMAIL_ROUTING_KEY = HIGH_PRIORITY_QUEUE
|
||||
BULK_EMAIL_DEFAULT_RETRY_DELAY = 15
|
||||
BULK_EMAIL_MAX_RETRIES = 5
|
||||
|
||||
################################### APPS ######################################
|
||||
INSTALLED_APPS = (
|
||||
# Standard ones that are always installed...
|
||||
|
||||
Reference in New Issue
Block a user