Merge pull request #16756 from edx/pacing/log-on-failure
Schedules: Add celery task logging
This commit is contained in:
@@ -1,8 +1,7 @@
|
||||
from celery import task
|
||||
from logging import getLogger
|
||||
|
||||
from celery_utils.logged_task import LoggedTask
|
||||
from celery_utils.persist_on_failure import PersistOnFailureTask
|
||||
from celery_utils.persist_on_failure import LoggedPersistOnFailureTask
|
||||
from django.contrib.auth.models import User
|
||||
from lms.djangoapps.verify_student.models import SoftwareSecurePhotoVerification
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
@@ -12,14 +11,7 @@ from .api import generate_user_certificates
|
||||
logger = getLogger(__name__)
|
||||
|
||||
|
||||
class _BaseCertificateTask(PersistOnFailureTask, LoggedTask): # pylint: disable=abstract-method
|
||||
"""
|
||||
Include persistence features, as well as logging of task invocation.
|
||||
"""
|
||||
abstract = True
|
||||
|
||||
|
||||
@task(base=_BaseCertificateTask, bind=True, default_retry_delay=30, max_retries=2)
|
||||
@task(base=LoggedPersistOnFailureTask, bind=True, default_retry_delay=30, max_retries=2)
|
||||
def generate_certificate(self, **kwargs):
|
||||
"""
|
||||
Generates a certificate for a single user.
|
||||
|
||||
@@ -6,8 +6,7 @@ from logging import getLogger
|
||||
|
||||
import six
|
||||
from celery import task
|
||||
from celery_utils.logged_task import LoggedTask
|
||||
from celery_utils.persist_on_failure import PersistOnFailureTask
|
||||
from celery_utils.persist_on_failure import LoggedPersistOnFailureTask
|
||||
from courseware.model_data import get_score
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
@@ -46,14 +45,7 @@ RETRY_DELAY_SECONDS = 30
|
||||
SUBSECTION_GRADE_TIMEOUT_SECONDS = 300
|
||||
|
||||
|
||||
class _BaseTask(PersistOnFailureTask, LoggedTask): # pylint: disable=abstract-method
|
||||
"""
|
||||
Include persistence features, as well as logging of task invocation.
|
||||
"""
|
||||
abstract = True
|
||||
|
||||
|
||||
@task(base=_BaseTask, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY)
|
||||
@task(base=LoggedPersistOnFailureTask, routing_key=settings.POLICY_CHANGE_GRADES_ROUTING_KEY)
|
||||
def compute_all_grades_for_course(**kwargs):
|
||||
"""
|
||||
Compute grades for all students in the specified course.
|
||||
@@ -77,7 +69,7 @@ def compute_all_grades_for_course(**kwargs):
|
||||
|
||||
@task(
|
||||
bind=True,
|
||||
base=_BaseTask,
|
||||
base=LoggedPersistOnFailureTask,
|
||||
default_retry_delay=RETRY_DELAY_SECONDS,
|
||||
max_retries=1,
|
||||
time_limit=COURSE_GRADE_TIMEOUT_SECONDS
|
||||
@@ -105,7 +97,7 @@ def compute_grades_for_course_v2(self, **kwargs):
|
||||
raise self.retry(kwargs=kwargs, exc=exc)
|
||||
|
||||
|
||||
@task(base=_BaseTask)
|
||||
@task(base=LoggedPersistOnFailureTask)
|
||||
def compute_grades_for_course(course_key, offset, batch_size, **kwargs): # pylint: disable=unused-argument
|
||||
"""
|
||||
Compute and save grades for a set of students in the specified course.
|
||||
@@ -124,7 +116,7 @@ def compute_grades_for_course(course_key, offset, batch_size, **kwargs): # pyli
|
||||
|
||||
@task(
|
||||
bind=True,
|
||||
base=_BaseTask,
|
||||
base=LoggedPersistOnFailureTask,
|
||||
time_limit=SUBSECTION_GRADE_TIMEOUT_SECONDS,
|
||||
max_retries=2,
|
||||
default_retry_delay=RETRY_DELAY_SECONDS,
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import logging
|
||||
|
||||
from celery import task
|
||||
from celery_utils.logged_task import LoggedTask
|
||||
from celery_utils.persist_on_failure import PersistOnFailureTask
|
||||
from celery_utils.persist_on_failure import LoggedPersistOnFailureTask
|
||||
from django.conf import settings
|
||||
|
||||
from opaque_keys.edx.keys import CourseKey
|
||||
@@ -25,13 +24,6 @@ def chunks(sequence, chunk_size):
|
||||
return (sequence[index: index + chunk_size] for index in xrange(0, len(sequence), chunk_size))
|
||||
|
||||
|
||||
class _BaseTask(PersistOnFailureTask, LoggedTask): # pylint: disable=abstract-method
|
||||
"""
|
||||
Include persistence features, as well as logging of task invocation.
|
||||
"""
|
||||
abstract = True
|
||||
|
||||
|
||||
def _task_options(routing_key):
|
||||
task_options = {}
|
||||
if getattr(settings, 'HIGH_MEM_QUEUE', None):
|
||||
@@ -64,7 +56,7 @@ def enqueue_async_course_overview_update_tasks(
|
||||
)
|
||||
|
||||
|
||||
@task(base=_BaseTask)
|
||||
@task(base=LoggedPersistOnFailureTask)
|
||||
def async_course_overview_update(*args, **kwargs):
|
||||
course_keys = [CourseKey.from_string(arg) for arg in args]
|
||||
CourseOverview.update_select_courses(course_keys, force_update=kwargs['force_update'])
|
||||
|
||||
@@ -200,7 +200,7 @@ class ScheduleSendEmailTestMixin(FilteredQueryCountMixin):
|
||||
is_first_match = False
|
||||
|
||||
with self.assertNumQueries(expected_queries, table_blacklist=WAFFLE_TABLES):
|
||||
self.task.apply(kwargs=dict(
|
||||
self.task().apply(kwargs=dict(
|
||||
site_id=self.site_config.site.id, target_day_str=target_day_str, day_offset=offset, bin_num=b,
|
||||
))
|
||||
|
||||
@@ -220,15 +220,15 @@ class ScheduleSendEmailTestMixin(FilteredQueryCountMixin):
|
||||
course_id=CourseKey.from_string('edX/toy/Not_2012_Fall'),
|
||||
user=UserFactory.create(),
|
||||
)
|
||||
schedule = self._schedule_factory(enrollment=enrollment)
|
||||
self._schedule_factory(enrollment=enrollment)
|
||||
|
||||
with patch.object(self.task, 'async_send_task') as mock_schedule_send:
|
||||
for b in range(self.task.num_bins):
|
||||
self.task.apply(kwargs=dict(
|
||||
for bin_num in range(self.task().num_bins):
|
||||
self.task().apply(kwargs=dict(
|
||||
site_id=self.site_config.site.id,
|
||||
target_day_str=serialize(target_day),
|
||||
day_offset=offset,
|
||||
bin_num=b,
|
||||
bin_num=bin_num,
|
||||
))
|
||||
|
||||
# There is no database constraint that enforces that enrollment.course_id points
|
||||
@@ -308,7 +308,7 @@ class ScheduleSendEmailTestMixin(FilteredQueryCountMixin):
|
||||
)
|
||||
|
||||
with patch.object(self.task, 'async_send_task') as mock_schedule_send:
|
||||
self.task.apply(kwargs=dict(
|
||||
self.task().apply(kwargs=dict(
|
||||
site_id=this_config.site.id, target_day_str=serialize(target_day), day_offset=offset, bin_num=0
|
||||
))
|
||||
|
||||
@@ -328,7 +328,7 @@ class ScheduleSendEmailTestMixin(FilteredQueryCountMixin):
|
||||
)
|
||||
|
||||
with patch.object(self.task, 'async_send_task') as mock_schedule_send:
|
||||
self.task.apply(kwargs=dict(
|
||||
self.task().apply(kwargs=dict(
|
||||
site_id=self.site_config.site.id, target_day_str=serialize(target_day), day_offset=offset, bin_num=0,
|
||||
))
|
||||
|
||||
@@ -354,7 +354,7 @@ class ScheduleSendEmailTestMixin(FilteredQueryCountMixin):
|
||||
expected_query_count = NUM_QUERIES_FIRST_MATCH + additional_course_queries
|
||||
with self.assertNumQueries(expected_query_count, table_blacklist=WAFFLE_TABLES):
|
||||
with patch.object(self.task, 'async_send_task') as mock_schedule_send:
|
||||
self.task.apply(kwargs=dict(
|
||||
self.task().apply(kwargs=dict(
|
||||
site_id=self.site_config.site.id, target_day_str=serialize(target_day), day_offset=offset,
|
||||
bin_num=self._calculate_bin_for_user(user),
|
||||
))
|
||||
@@ -402,7 +402,7 @@ class ScheduleSendEmailTestMixin(FilteredQueryCountMixin):
|
||||
num_expected_queries += 1
|
||||
|
||||
with self.assertNumQueries(num_expected_queries, table_blacklist=WAFFLE_TABLES):
|
||||
self.task.apply(kwargs=dict(
|
||||
self.task().apply(kwargs=dict(
|
||||
site_id=self.site_config.site.id, target_day_str=serialize(target_day), day_offset=offset,
|
||||
bin_num=self._calculate_bin_for_user(user),
|
||||
))
|
||||
@@ -436,7 +436,7 @@ class ScheduleSendEmailTestMixin(FilteredQueryCountMixin):
|
||||
schedule = self._schedule_factory(**kwargs)
|
||||
|
||||
with patch.object(tasks, 'ace') as mock_ace:
|
||||
self.task.apply(kwargs=dict(
|
||||
self.task().apply(kwargs=dict(
|
||||
site_id=self.site_config.site.id, target_day_str=serialize(target_day), day_offset=offset,
|
||||
bin_num=self._calculate_bin_for_user(schedule.enrollment.user),
|
||||
))
|
||||
|
||||
@@ -87,7 +87,7 @@ class TestSendCourseUpdate(ScheduleUpsellTestMixin, ScheduleSendEmailTestMixin,
|
||||
enrollment.schedule.save()
|
||||
|
||||
with patch.object(tasks, 'ace') as mock_ace:
|
||||
self.task.apply(kwargs=dict( # pylint: disable=no-value-for-parameter
|
||||
self.task().apply(kwargs=dict( # pylint: disable=no-value-for-parameter
|
||||
site_id=self.site_config.site.id,
|
||||
target_day_str=serialize(target_day),
|
||||
day_offset=offset,
|
||||
|
||||
@@ -52,7 +52,7 @@ class TestUpgradeReminder(ScheduleSendEmailTestMixin, CacheIsolationTestCase):
|
||||
enrollment__mode=CourseMode.VERIFIED if is_verified else CourseMode.AUDIT,
|
||||
)
|
||||
|
||||
self.task.apply(kwargs=dict(
|
||||
self.task().apply(kwargs=dict(
|
||||
site_id=self.site_config.site.id, target_day_str=serialize(target_day), day_offset=offset,
|
||||
bin_num=self._calculate_bin_for_user(schedule.enrollment.user),
|
||||
))
|
||||
@@ -76,7 +76,7 @@ class TestUpgradeReminder(ScheduleSendEmailTestMixin, CacheIsolationTestCase):
|
||||
with patch.object(self.task, 'async_send_task') as mock_schedule_send:
|
||||
mock_schedule_send.apply_async = lambda args, *_a, **_kw: sent_messages.append(args[1])
|
||||
|
||||
self.task.apply(kwargs=dict(
|
||||
self.task().apply(kwargs=dict(
|
||||
site_id=self.site_config.site.id, target_day_str=serialize(target_day), day_offset=offset,
|
||||
bin_num=self._calculate_bin_for_user(user),
|
||||
))
|
||||
@@ -95,7 +95,7 @@ class TestUpgradeReminder(ScheduleSendEmailTestMixin, CacheIsolationTestCase):
|
||||
schedule = self._schedule_factory()
|
||||
schedule.enrollment.course.modes.filter(mode_slug=CourseMode.VERIFIED).delete()
|
||||
|
||||
self.task.apply(kwargs=dict(
|
||||
self.task().apply(kwargs=dict(
|
||||
site_id=self.site_config.site.id, target_day_str=serialize(target_day), day_offset=offset,
|
||||
bin_num=self._calculate_bin_for_user(schedule.enrollment.user),
|
||||
))
|
||||
|
||||
@@ -40,7 +40,7 @@ class ScheduleUpsellTestMixin(object):
|
||||
sent_messages = []
|
||||
with patch.object(self.task, 'async_send_task') as mock_schedule_send:
|
||||
mock_schedule_send.apply_async = lambda args, *_a, **_kw: sent_messages.append(args[1])
|
||||
self.task.apply(kwargs=dict(
|
||||
self.task().apply(kwargs=dict(
|
||||
site_id=self.site_config.site.id, target_day_str=serialize(target_day), day_offset=offset,
|
||||
bin_num=self._calculate_bin_for_user(schedule.enrollment.user),
|
||||
))
|
||||
|
||||
@@ -2,7 +2,7 @@ import datetime
|
||||
import logging
|
||||
|
||||
import analytics
|
||||
from celery.task import task, Task
|
||||
from celery import task
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.sites.models import Site
|
||||
@@ -10,6 +10,8 @@ from django.core.exceptions import ValidationError
|
||||
|
||||
from django.db.utils import DatabaseError
|
||||
|
||||
from celery_utils.logged_task import LoggedTask
|
||||
from celery_utils.persist_on_failure import LoggedPersistOnFailureTask
|
||||
from edx_ace import ace
|
||||
from edx_ace.message import Message
|
||||
from edx_ace.utils.date import deserialize, serialize
|
||||
@@ -36,7 +38,7 @@ UPGRADE_REMINDER_LOG_PREFIX = 'Upgrade Reminder'
|
||||
COURSE_UPDATE_LOG_PREFIX = 'Course Update'
|
||||
|
||||
|
||||
@task(bind=True, default_retry_delay=30, routing_key=ROUTING_KEY)
|
||||
@task(base=LoggedPersistOnFailureTask, bind=True, default_retry_delay=30, routing_key=ROUTING_KEY)
|
||||
def update_course_schedules(self, **kwargs):
|
||||
course_key = CourseKey.from_string(kwargs['course_id'])
|
||||
new_start_date = deserialize(kwargs['new_start_date_str'])
|
||||
@@ -53,7 +55,11 @@ def update_course_schedules(self, **kwargs):
|
||||
raise self.retry(kwargs=kwargs, exc=exc)
|
||||
|
||||
|
||||
class ScheduleMessageBaseTask(Task):
|
||||
class ScheduleMessageBaseTask(LoggedTask):
|
||||
"""
|
||||
Base class for top-level Schedule tasks that create subtasks
|
||||
for each Bin.
|
||||
"""
|
||||
ignore_result = True
|
||||
routing_key = ROUTING_KEY
|
||||
num_bins = resolvers.DEFAULT_NUM_BINS
|
||||
@@ -95,7 +101,7 @@ class ScheduleMessageBaseTask(Task):
|
||||
override_recipient_email,
|
||||
)
|
||||
cls.log_info('Launching task with args = %r', task_args)
|
||||
cls.apply_async(
|
||||
cls().apply_async(
|
||||
task_args,
|
||||
retry=False,
|
||||
)
|
||||
@@ -126,7 +132,7 @@ class ScheduleMessageBaseTask(Task):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@task(ignore_result=True, routing_key=ROUTING_KEY)
|
||||
@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY)
|
||||
def _recurring_nudge_schedule_send(site_id, msg_str):
|
||||
_schedule_send(
|
||||
msg_str,
|
||||
@@ -136,7 +142,7 @@ def _recurring_nudge_schedule_send(site_id, msg_str):
|
||||
)
|
||||
|
||||
|
||||
@task(ignore_result=True, routing_key=ROUTING_KEY)
|
||||
@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY)
|
||||
def _upgrade_reminder_schedule_send(site_id, msg_str):
|
||||
_schedule_send(
|
||||
msg_str,
|
||||
@@ -146,7 +152,7 @@ def _upgrade_reminder_schedule_send(site_id, msg_str):
|
||||
)
|
||||
|
||||
|
||||
@task(ignore_result=True, routing_key=ROUTING_KEY)
|
||||
@task(base=LoggedTask, ignore_result=True, routing_key=ROUTING_KEY)
|
||||
def _course_update_schedule_send(site_id, msg_str):
|
||||
_schedule_send(
|
||||
msg_str,
|
||||
|
||||
@@ -38,7 +38,7 @@ djangorestframework-jwt==1.11.0
|
||||
enum34==1.1.6
|
||||
edx-ace==0.1.6
|
||||
edx-ccx-keys==0.2.1
|
||||
edx-celeryutils==0.2.6
|
||||
edx-celeryutils==0.2.7
|
||||
edx-drf-extensions==1.2.3
|
||||
edx-i18n-tools==0.3.10
|
||||
edx-lint==0.4.3
|
||||
|
||||
@@ -104,6 +104,7 @@ git+https://github.com/edx/edx-proctoring.git@1.3.1#egg=edx-proctoring==1.3.1
|
||||
# This is here because all of the other XBlocks are located here. However, it is published to PyPI and will be installed that way
|
||||
xblock-review==1.1.1
|
||||
|
||||
|
||||
# Third Party XBlocks
|
||||
|
||||
git+https://github.com/mitodl/edx-sga.git@d019b8a050c056db535e3ff13c93096145a932de#egg=edx-sga==0.7.1
|
||||
|
||||
Reference in New Issue
Block a user