Move binning queries into resolvers.py to be closer to the Resolvers where they will live

This commit is contained in:
Calen Pennington
2017-10-17 21:03:08 -04:00
parent 00d8b2a492
commit 31db37eaf9
2 changed files with 260 additions and 242 deletions

View File

@@ -1,9 +1,22 @@
import datetime
from itertools import groupby
import logging
from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.staticfiles.templatetags.staticfiles import static
from django.core.urlresolvers import reverse
from django.db.models import F, Min, Q
from django.utils.formats import dateformat, get_format
from edx_ace.recipient_resolver import RecipientResolver
from edx_ace.utils.date import serialize
from openedx.core.djangoapps.schedules.models import ScheduleConfig
from courseware.date_summary import verified_upgrade_deadline_link, verified_upgrade_link_is_valid
from openedx.core.djangoapps.schedules.config import COURSE_UPDATE_WAFFLE_FLAG
from openedx.core.djangoapps.schedules.exceptions import CourseUpdateDoesNotExist
from openedx.core.djangoapps.schedules.models import Schedule, ScheduleConfig
from openedx.core.djangoapps.schedules.tasks import (
DEFAULT_NUM_BINS,
RECURRING_NUDGE_NUM_BINS,
@@ -14,8 +27,19 @@ from openedx.core.djangoapps.schedules.tasks import (
course_update_schedule_bin,
)
from openedx.core.djangoapps.schedules.utils import PrefixedDebugLoggerMixin
from openedx.core.djangoapps.schedules.template_context import (
absolute_url,
encode_url,
encode_urls_in_dict,
get_base_template_context
)
from openedx.core.djangoapps.site_configuration.models import SiteConfiguration
from request_cache.middleware import request_cached
from xmodule.modulestore.django import modulestore
LOG = logging.getLogger(__name__)
class BinnedSchedulesBaseResolver(PrefixedDebugLoggerMixin, RecipientResolver):
"""
@@ -109,6 +133,118 @@ class ScheduleStartResolver(BinnedSchedulesBaseResolver):
self.log_prefix = 'Scheduled Nudge'
def get_schedules_with_target_date_by_bin_and_orgs(schedule_date_field, current_datetime, target_datetime, bin_num,
num_bins=DEFAULT_NUM_BINS, org_list=None, exclude_orgs=False,
order_by='enrollment__user__id'):
"""
Returns Schedules with the target_date, related to Users whose id matches the bin_num, and filtered by org_list.
Arguments:
schedule_date_field -- string field name to query on the User's Schedule model
current_datetime -- datetime that will be used as "right now" in the query
target_datetime -- datetime that the User's Schedule's schedule_date_field value should fall under
bin_num -- int for selecting the bin of Users whose id % num_bins == bin_num
num_bin -- int specifying the number of bins to separate the Users into (default: DEFAULT_NUM_BINS)
org_list -- list of course_org names (strings) that the returned Schedules must or must not be in (default: None)
exclude_orgs -- boolean indicating whether the returned Schedules should exclude (True) the course_orgs in org_list
or strictly include (False) them (default: False)
order_by -- string for field to sort the resulting Schedules by
"""
target_day = _get_datetime_beginning_of_day(target_datetime)
schedule_day_equals_target_day_filter = {
'courseenrollment__schedule__{}__gte'.format(schedule_date_field): target_day,
'courseenrollment__schedule__{}__lt'.format(schedule_date_field): target_day + datetime.timedelta(days=1),
}
users = User.objects.filter(
courseenrollment__is_active=True,
**schedule_day_equals_target_day_filter
).annotate(
id_mod=F('id') % num_bins
).filter(
id_mod=bin_num
)
schedule_day_equals_target_day_filter = {
'{}__gte'.format(schedule_date_field): target_day,
'{}__lt'.format(schedule_date_field): target_day + datetime.timedelta(days=1),
}
schedules = Schedule.objects.select_related(
'enrollment__user__profile',
'enrollment__course',
).prefetch_related(
'enrollment__course__modes'
).filter(
Q(enrollment__course__end__isnull=True) | Q(
enrollment__course__end__gte=current_datetime),
enrollment__user__in=users,
enrollment__is_active=True,
**schedule_day_equals_target_day_filter
).order_by(order_by)
if org_list is not None:
if exclude_orgs:
schedules = schedules.exclude(enrollment__course__org__in=org_list)
else:
schedules = schedules.filter(enrollment__course__org__in=org_list)
if "read_replica" in settings.DATABASES:
schedules = schedules.using("read_replica")
LOG.debug('Query = %r', schedules.query.sql_with_params())
with function_trace('schedule_query_set_evaluation'):
# This will run the query and cache all of the results in memory.
num_schedules = len(schedules)
# This should give us a sense of the volume of data being processed by each task.
set_custom_metric('num_schedules', num_schedules)
return schedules
def _recurring_nudge_schedules_for_bin(site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False):
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='start',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=RECURRING_NUDGE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs,
)
for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user):
user_schedules = list(user_schedules)
course_id_strs = [str(schedule.enrollment.course_id)
for schedule in user_schedules]
first_schedule = user_schedules[0]
template_context = get_base_template_context(site)
template_context.update({
'student_name': user.profile.name,
'course_name': first_schedule.enrollment.course.display_name,
'course_url': absolute_url(site, reverse('course_root', args=[str(first_schedule.enrollment.course_id)])),
# This is used by the bulk email optout policy
'course_ids': course_id_strs,
})
# Information for including upsell messaging in template.
_add_upsell_button_information_to_template_context(
user, first_schedule, template_context)
yield (user, first_schedule.enrollment.course.language, template_context)
def _get_datetime_beginning_of_day(dt):
"""
Truncates hours, minutes, seconds, and microseconds to zero on given datetime.
"""
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
class UpgradeReminderResolver(BinnedSchedulesBaseResolver):
"""
Send a message to all users whose verified upgrade deadline is at ``self.current_date`` + ``day_offset``.
@@ -122,6 +258,73 @@ class UpgradeReminderResolver(BinnedSchedulesBaseResolver):
self.log_prefix = 'Upgrade Reminder'
def _upgrade_reminder_schedules_for_bin(site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False):
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='upgrade_deadline',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=RECURRING_NUDGE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs,
)
for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user):
user_schedules = list(user_schedules)
course_id_strs = [str(schedule.enrollment.course_id) for schedule in user_schedules]
first_schedule = user_schedules[0]
template_context = get_base_template_context(self.site)
template_context.update({
'student_name': user.profile.name,
'course_links': [
{
'url': absolute_url(self.site, reverse('course_root', args=[str(s.enrollment.course_id)])),
'name': s.enrollment.course.display_name
} for s in user_schedules
],
'first_course_name': first_schedule.enrollment.course.display_name,
'cert_image': absolute_url(self.site, static('course_experience/images/verified-cert.png')),
# This is used by the bulk email optout policy
'course_ids': course_id_strs,
})
_add_upsell_button_information_to_template_context(user, first_schedule, template_context)
yield (user, first_schedule.enrollment.course.language, template_context)
def _add_upsell_button_information_to_template_context(user, schedule, template_context):
enrollment = schedule.enrollment
course = enrollment.course
verified_upgrade_link = _get_link_to_purchase_verified_certificate(
user, schedule)
has_verified_upgrade_link = verified_upgrade_link is not None
if has_verified_upgrade_link:
template_context['upsell_link'] = verified_upgrade_link
template_context['user_schedule_upgrade_deadline_time'] = dateformat.format(
enrollment.dynamic_upgrade_deadline,
get_format(
'DATE_FORMAT',
lang=course.language,
use_l10n=True
)
)
template_context['show_upsell'] = has_verified_upgrade_link
def _get_link_to_purchase_verified_certificate(a_user, a_schedule):
enrollment = a_schedule.enrollment
if enrollment.dynamic_upgrade_deadline is None or not verified_upgrade_link_is_valid(enrollment):
return None
return verified_upgrade_deadline_link(a_user, enrollment.course)
class CourseUpdateResolver(BinnedSchedulesBaseResolver):
"""
Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset`` and the
@@ -134,3 +337,53 @@ class CourseUpdateResolver(BinnedSchedulesBaseResolver):
def __init__(self, *args, **kwargs):
super(CourseUpdateResolver, self).__init__(*args, **kwargs)
self.log_prefix = 'Course Update'
def _course_update_schedules_for_bin(site, current_datetime, target_datetime, day_offset, bin_num, org_list,
exclude_orgs=False):
week_num = abs(day_offset) / 7
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='start',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=COURSE_UPDATE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs,
order_by='enrollment__course',
)
for schedule in schedules:
enrollment = schedule.enrollment
try:
week_summary = get_course_week_summary(
enrollment.course_id, week_num)
except CourseUpdateDoesNotExist:
continue
user = enrollment.user
course_id_str = str(enrollment.course_id)
template_context = get_base_template_context(site)
template_context.update({
'student_name': user.profile.name,
'user_personal_address': user.profile.name if user.profile.name else user.username,
'course_name': schedule.enrollment.course.display_name,
'course_url': absolute_url(site, reverse('course_root', args=[str(schedule.enrollment.course_id)])),
'week_num': week_num,
'week_summary': week_summary,
# This is used by the bulk email optout policy
'course_ids': [course_id_str],
})
yield (user, schedule.enrollment.course.language, template_context)
@request_cached
def get_course_week_summary(course_id, week_num):
if COURSE_UPDATE_WAFFLE_FLAG.is_enabled(course_id):
course = modulestore().get_course(course_id)
return course.week_summary(week_num)
else:
raise CourseUpdateDoesNotExist()

View File

@@ -1,15 +1,12 @@
import datetime
from itertools import groupby
import logging
from celery.task import task
from django.conf import settings
from django.contrib.auth.models import User
from django.contrib.sites.models import Site
from django.contrib.staticfiles.templatetags.staticfiles import static
from django.core.exceptions import ValidationError
from django.core.urlresolvers import reverse
from django.db.models import F, Q
from django.db.utils import DatabaseError
from django.utils.formats import dateformat, get_format
@@ -19,16 +16,12 @@ from edx_ace.recipient import Recipient
from edx_ace.utils.date import deserialize
from opaque_keys.edx.keys import CourseKey
from courseware.date_summary import verified_upgrade_deadline_link, verified_upgrade_link_is_valid
from openedx.core.djangoapps.monitoring_utils import set_custom_metric, function_trace
from request_cache.middleware import request_cached
from xmodule.modulestore.django import modulestore
from openedx.core.djangoapps.schedules.config import COURSE_UPDATE_WAFFLE_FLAG
from openedx.core.djangoapps.schedules.exceptions import CourseUpdateDoesNotExist
from openedx.core.djangoapps.schedules.message_type import ScheduleMessageType
from openedx.core.djangoapps.schedules.models import Schedule, ScheduleConfig
from openedx.core.djangoapps.schedules.template_context import absolute_url, get_base_template_context
from openedx.core.djangoapps.schedules.message_type import ScheduleMessageType
from openedx.core.djangoapps.schedules import resolvers
LOG = logging.getLogger(__name__)
@@ -95,7 +88,7 @@ def recurring_nudge_schedule_bin(
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset)
for (user, language, context) in _recurring_nudge_schedules_for_bin(
for (user, language, context) in resolvers._recurring_nudge_schedules_for_bin(
site,
current_datetime,
target_datetime,
@@ -131,40 +124,6 @@ def _annotate_for_monitoring(message_type, site, bin_num, target_day_str, day_of
set_custom_metric('send_uuid', message_type.uuid)
def _recurring_nudge_schedules_for_bin(site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False):
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='start',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=RECURRING_NUDGE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs,
)
for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user):
user_schedules = list(user_schedules)
course_id_strs = [str(schedule.enrollment.course_id) for schedule in user_schedules]
first_schedule = user_schedules[0]
template_context = get_base_template_context(site)
template_context.update({
'student_name': user.profile.name,
'course_name': first_schedule.enrollment.course.display_name,
'course_url': absolute_url(site, reverse('course_root', args=[str(first_schedule.enrollment.course_id)])),
# This is used by the bulk email optout policy
'course_ids': course_id_strs,
})
# Information for including upsell messaging in template.
_add_upsell_button_information_to_template_context(user, first_schedule, template_context)
yield (user, first_schedule.enrollment.course.language, template_context)
class UpgradeReminder(ScheduleMessageType):
pass
@@ -181,7 +140,7 @@ def upgrade_reminder_schedule_bin(
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset)
for (user, language, context) in _upgrade_reminder_schedules_for_bin(
for (user, language, context) in resolvers._upgrade_reminder_schedules_for_bin(
site,
current_datetime,
target_datetime,
@@ -211,47 +170,6 @@ def _upgrade_reminder_schedule_send(site_id, msg_str):
ace.send(msg)
def _upgrade_reminder_schedules_for_bin(site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False):
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='upgrade_deadline',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=RECURRING_NUDGE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs,
)
for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user):
user_schedules = list(user_schedules)
course_id_strs = [str(schedule.enrollment.course_id) for schedule in user_schedules]
first_schedule = user_schedules[0]
template_context = get_base_template_context(site)
template_context.update({
'student_name': user.profile.name,
'user_personal_address': user.profile.name if user.profile.name else user.username,
'course_links': [
{
'url': absolute_url(site, reverse('course_root', args=[str(s.enrollment.course_id)])),
'name': s.enrollment.course.display_name
} for s in user_schedules
],
'first_course_name': first_schedule.enrollment.course.display_name,
# This is used by the bulk email optout policy
'course_ids': course_id_strs,
'cert_image': absolute_url(site, static('course_experience/images/verified-cert.png')),
})
_add_upsell_button_information_to_template_context(user, first_schedule, template_context)
yield (user, first_schedule.enrollment.course.language, template_context)
class CourseUpdate(ScheduleMessageType):
pass
@@ -268,7 +186,7 @@ def course_update_schedule_bin(
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset)
for (user, language, context) in _course_update_schedules_for_bin(
for (user, language, context) in resolvers._course_update_schedules_for_bin(
site,
current_datetime,
target_datetime,
@@ -297,156 +215,3 @@ def _course_update_schedule_send(site_id, msg_str):
msg = Message.from_string(msg_str)
ace.send(msg)
def _course_update_schedules_for_bin(site, current_datetime, target_datetime, day_offset, bin_num, org_list,
exclude_orgs=False):
week_num = abs(day_offset) / 7
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='start',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=COURSE_UPDATE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs,
order_by='enrollment__course',
)
for schedule in schedules:
enrollment = schedule.enrollment
try:
week_summary = get_course_week_summary(enrollment.course_id, week_num)
except CourseUpdateDoesNotExist:
continue
user = enrollment.user
course_id_str = str(enrollment.course_id)
template_context = get_base_template_context(site)
template_context.update({
'student_name': user.profile.name,
'user_personal_address': user.profile.name if user.profile.name else user.username,
'course_name': schedule.enrollment.course.display_name,
'course_url': absolute_url(site, reverse('course_root', args=[str(schedule.enrollment.course_id)])),
'week_num': week_num,
'week_summary': week_summary,
# This is used by the bulk email optout policy
'course_ids': [course_id_str],
})
yield (user, schedule.enrollment.course.language, template_context)
@request_cached
def get_course_week_summary(course_id, week_num):
if COURSE_UPDATE_WAFFLE_FLAG.is_enabled(course_id):
course = modulestore().get_course(course_id)
return course.week_summary(week_num)
else:
raise CourseUpdateDoesNotExist()
def get_schedules_with_target_date_by_bin_and_orgs(schedule_date_field, current_datetime, target_datetime, bin_num,
num_bins=DEFAULT_NUM_BINS, org_list=None, exclude_orgs=False,
order_by='enrollment__user__id'):
"""
Returns Schedules with the target_date, related to Users whose id matches the bin_num, and filtered by org_list.
Arguments:
schedule_date_field -- string field name to query on the User's Schedule model
current_datetime -- datetime that will be used as "right now" in the query
target_datetime -- datetime that the User's Schedule's schedule_date_field value should fall under
bin_num -- int for selecting the bin of Users whose id % num_bins == bin_num
num_bin -- int specifying the number of bins to separate the Users into (default: DEFAULT_NUM_BINS)
org_list -- list of course_org names (strings) that the returned Schedules must or must not be in (default: None)
exclude_orgs -- boolean indicating whether the returned Schedules should exclude (True) the course_orgs in org_list
or strictly include (False) them (default: False)
order_by -- string for field to sort the resulting Schedules by
"""
target_day = _get_datetime_beginning_of_day(target_datetime)
schedule_day_equals_target_day_filter = {
'courseenrollment__schedule__{}__gte'.format(schedule_date_field): target_day,
'courseenrollment__schedule__{}__lt'.format(schedule_date_field): target_day + datetime.timedelta(days=1),
}
users = User.objects.filter(
courseenrollment__is_active=True,
**schedule_day_equals_target_day_filter
).annotate(
id_mod=F('id') % num_bins
).filter(
id_mod=bin_num
)
schedule_day_equals_target_day_filter = {
'{}__gte'.format(schedule_date_field): target_day,
'{}__lt'.format(schedule_date_field): target_day + datetime.timedelta(days=1),
}
schedules = Schedule.objects.select_related(
'enrollment__user__profile',
'enrollment__course',
).prefetch_related(
'enrollment__course__modes'
).filter(
Q(enrollment__course__end__isnull=True) | Q(enrollment__course__end__gte=current_datetime),
enrollment__user__in=users,
enrollment__is_active=True,
**schedule_day_equals_target_day_filter
).order_by(order_by)
if org_list is not None:
if exclude_orgs:
schedules = schedules.exclude(enrollment__course__org__in=org_list)
else:
schedules = schedules.filter(enrollment__course__org__in=org_list)
if "read_replica" in settings.DATABASES:
schedules = schedules.using("read_replica")
LOG.debug('Query = %r', schedules.query.sql_with_params())
with function_trace('schedule_query_set_evaluation'):
# This will run the query and cache all of the results in memory.
num_schedules = len(schedules)
# This should give us a sense of the volume of data being processed by each task.
set_custom_metric('num_schedules', num_schedules)
return schedules
def _add_upsell_button_information_to_template_context(user, schedule, template_context):
enrollment = schedule.enrollment
course = enrollment.course
verified_upgrade_link = _get_link_to_purchase_verified_certificate(user, schedule)
has_verified_upgrade_link = verified_upgrade_link is not None
if has_verified_upgrade_link:
template_context['upsell_link'] = verified_upgrade_link
template_context['user_schedule_upgrade_deadline_time'] = dateformat.format(
enrollment.dynamic_upgrade_deadline,
get_format(
'DATE_FORMAT',
lang=course.language,
use_l10n=True
)
)
template_context['show_upsell'] = has_verified_upgrade_link
def _get_link_to_purchase_verified_certificate(a_user, a_schedule):
enrollment = a_schedule.enrollment
if enrollment.dynamic_upgrade_deadline is None or not verified_upgrade_link_is_valid(enrollment):
return None
return verified_upgrade_deadline_link(a_user, enrollment.course)
def _get_datetime_beginning_of_day(dt):
"""
Truncates hours, minutes, seconds, and microseconds to zero on given datetime.
"""
return dt.replace(hour=0, minute=0, second=0, microsecond=0)