Move bin-task enqueuing into a classmethod (rather than having it on the RecipientResolvers

This commit is contained in:
Calen Pennington
2017-10-18 14:01:40 -04:00
parent 352fa067ae
commit d222b2d718
11 changed files with 456 additions and 464 deletions

View File

@@ -8,7 +8,6 @@ from openedx.core.djangoapps.schedules.utils import PrefixedDebugLoggerMixin
class SendEmailBaseCommand(PrefixedDebugLoggerMixin, BaseCommand):
resolver_class = None # define in subclass
async_send_task = None # define in subclass
def add_arguments(self, parser):
@@ -24,20 +23,27 @@ class SendEmailBaseCommand(PrefixedDebugLoggerMixin, BaseCommand):
parser.add_argument('site_domain_name')
def handle(self, *args, **options):
resolver = self.make_resolver(*args, **options)
self.send_emails(resolver, *args, **options)
self.log_debug('Args = %r', options)
def make_resolver(self, *args, **options):
current_date = datetime.datetime(
*[int(x) for x in options['date'].split('-')],
tzinfo=pytz.UTC
)
self.log_debug('Args = %r', options)
self.log_debug('Current date = %s', current_date.isoformat())
site = Site.objects.get(domain__iexact=options['site_domain_name'])
self.log_debug('Running for site %s', site.domain)
return self.resolver_class(site, current_date, async_send_task=self.async_send_task)
def send_emails(self, resolver, *args, **options):
override_recipient_email = options.get('override_recipient_email')
self.send_emails(site, current_date, override_recipient_email)
def enqueue(self, day_offset, site, current_date, override_recipient_email=None):
self.async_send_task.enqueue(
site,
current_date,
day_offset,
override_recipient_email,
)
def send_emails(self, *args, **kwargs):
pass # define in subclass

View File

@@ -1,16 +1,14 @@
from openedx.core.djangoapps.schedules.management.commands import SendEmailBaseCommand
from openedx.core.djangoapps.schedules.resolvers import CourseUpdateResolver
from openedx.core.djangoapps.schedules.tasks import ScheduleCourseUpdate
class Command(SendEmailBaseCommand):
resolver_class = CourseUpdateResolver
async_send_task = ScheduleCourseUpdate
def __init__(self, *args, **kwargs):
super(Command, self).__init__(*args, **kwargs)
self.log_prefix = 'Upgrade Reminder'
def send_emails(self, resolver, *args, **options):
def send_emails(self, *args, **kwargs):
for day_offset in xrange(-7, -77, -7):
resolver.send(day_offset, options.get('override_recipient_email'))
self.enqueue(day_offset, *args, **kwargs)

View File

@@ -1,16 +1,14 @@
from openedx.core.djangoapps.schedules.management.commands import SendEmailBaseCommand
from openedx.core.djangoapps.schedules.resolvers import ScheduleStartResolver
from openedx.core.djangoapps.schedules.tasks import ScheduleRecurringNudge
class Command(SendEmailBaseCommand):
resolver_class = ScheduleStartResolver
async_send_task = ScheduleRecurringNudge
def __init__(self, *args, **kwargs):
super(Command, self).__init__(*args, **kwargs)
self.log_prefix = 'Scheduled Nudge'
def send_emails(self, resolver, *args, **options):
def send_emails(self, *args, **kwargs):
for day_offset in (-3, -10):
resolver.send(day_offset, options.get('override_recipient_email'))
self.enqueue(day_offset, *args, **kwargs)

View File

@@ -1,15 +1,13 @@
from openedx.core.djangoapps.schedules.management.commands import SendEmailBaseCommand
from openedx.core.djangoapps.schedules.resolvers import UpgradeReminderResolver
from openedx.core.djangoapps.schedules.tasks import ScheduleUpgradeReminder
class Command(SendEmailBaseCommand):
resolver_class = UpgradeReminderResolver
async_send_task = ScheduleUpgradeReminder
def __init__(self, *args, **kwargs):
super(Command, self).__init__(*args, **kwargs)
self.log_prefix = 'Upgrade Reminder'
def send_emails(self, resolver, *args, **options):
resolver.send(2, options.get('override_recipient_email'))
def send_emails(self, *args, **kwargs):
self.enqueue(2, *args, **kwargs)

View File

@@ -18,24 +18,13 @@ from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_un
class TestSendEmailBaseCommand(CacheIsolationTestCase):
def setUp(self):
self.command = SendEmailBaseCommand()
def test_init_resolver_class(self):
assert self.command.resolver_class is None
def test_make_resolver(self):
with patch.object(self.command, 'resolver_class') as resolver_class:
example_site = SiteFactory(domain='example.com')
self.command.make_resolver(site_domain_name='example.com', date='2017-09-29')
resolver_class.assert_called_once_with(
example_site,
datetime.datetime(2017, 9, 29, tzinfo=pytz.UTC),
async_send_task=None,
)
self.site = SiteFactory()
def test_handle(self):
with patch.object(self.command, 'make_resolver') as make_resolver:
make_resolver.return_value = 'resolver'
with patch.object(self.command, 'send_emails') as send_emails:
self.command.handle(date='2017-09-29')
make_resolver.assert_called_once_with(date='2017-09-29')
send_emails.assert_called_once_with('resolver', date='2017-09-29')
with patch.object(self.command, 'send_emails') as send_emails:
self.command.handle(site_domain_name=self.site.domain, date='2017-09-29')
send_emails.assert_called_once_with(
self.site,
datetime.datetime(2017, 9, 29, tzinfo=pytz.UTC),
None
)

View File

@@ -60,35 +60,33 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
DynamicUpgradeDeadlineConfiguration.objects.create(enabled=True)
@patch.object(nudge.Command, 'resolver_class')
def test_handle(self, mock_resolver):
@patch.object(nudge.Command, 'async_send_task')
def test_handle(self, mock_send):
test_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC)
nudge.Command().handle(date='2017-08-01', site_domain_name=self.site_config.site.domain)
mock_resolver.assert_called_with(
self.site_config.site,
test_day,
async_send_task=nudge.Command.async_send_task,
)
for day in (-3, -10):
mock_resolver().send.assert_any_call(day, None)
mock_send.enqueue.assert_any_call(
self.site_config.site,
test_day,
day,
None
)
@patch.object(tasks, 'ace')
def test_resolver_send(self, mock_ace):
current_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC)
mock_schedule_bin = Mock()
nudge.ScheduleStartResolver(self.site_config.site, current_day, mock_schedule_bin).send(-3)
test_day = current_day + datetime.timedelta(days=-3)
self.assertFalse(mock_schedule_bin.called)
mock_schedule_bin.apply_async.assert_any_call(
(self.site_config.site.id, serialize(test_day), -3, 0, [], True, None),
retry=False,
)
mock_schedule_bin.apply_async.assert_any_call(
(self.site_config.site.id, serialize(test_day), -3, resolvers.RECURRING_NUDGE_NUM_BINS - 1, [], True, None),
retry=False,
)
self.assertFalse(mock_ace.send.called)
with patch.object(tasks.ScheduleRecurringNudge, 'apply_async') as mock_apply_async:
tasks.ScheduleRecurringNudge.enqueue(self.site_config.site, current_day, -3)
test_day = current_day + datetime.timedelta(days=-3)
mock_apply_async.assert_any_call(
(self.site_config.site.id, serialize(test_day), -3, 0, [], True, None),
retry=False,
)
mock_apply_async.assert_any_call(
(self.site_config.site.id, serialize(test_day), -3, resolvers.RECURRING_NUDGE_NUM_BINS - 1, [], True, None),
retry=False,
)
self.assertFalse(mock_ace.send.called)
@ddt.data(1, 10, 100)
@patch.object(tasks, 'ace')
@@ -180,11 +178,12 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase):
mock_schedule_bin = Mock()
current_datetime = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC)
nudge.ScheduleStartResolver(
tasks.ScheduleRecurringNudge.enqueue(
self.site_config.site,
current_datetime,
mock_schedule_bin,
).send(3)
3
)
self.assertFalse(mock_schedule_bin.called)
self.assertFalse(mock_schedule_bin.apply_async.called)
self.assertFalse(mock_ace.send.called)

View File

@@ -63,36 +63,34 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
DynamicUpgradeDeadlineConfiguration.objects.create(enabled=True)
@patch.object(reminder.Command, 'resolver_class')
def test_handle(self, mock_resolver):
@patch.object(reminder.Command, 'async_send_task')
def test_handle(self, mock_send):
test_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC)
reminder.Command().handle(date='2017-08-01', site_domain_name=self.site_config.site.domain)
mock_resolver.assert_called_with(
mock_send.enqueue.assert_called_with(
self.site_config.site,
test_day,
async_send_task=reminder.Command.async_send_task,
2,
None
)
mock_resolver().send.assert_any_call(2, None)
@patch.object(tasks, 'ace')
def test_resolver_send(self, mock_ace):
current_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC)
test_day = current_day + datetime.timedelta(days=2)
ScheduleFactory.create(upgrade_deadline=datetime.datetime(2017, 8, 3, 15, 34, 30, tzinfo=pytz.UTC))
mock_schedule_bin = Mock()
reminder.UpgradeReminderResolver(self.site_config.site, current_day, mock_schedule_bin).send(2)
self.assertFalse(mock_schedule_bin.called)
mock_schedule_bin.apply_async.assert_any_call(
(self.site_config.site.id, serialize(test_day), 2, 0, [], True, None),
retry=False,
)
mock_schedule_bin.apply_async.assert_any_call(
(self.site_config.site.id, serialize(test_day), 2, resolvers.UPGRADE_REMINDER_NUM_BINS - 1, [], True, None),
retry=False,
)
self.assertFalse(mock_ace.send.called)
with patch.object(tasks.ScheduleUpgradeReminder, 'apply_async') as mock_apply_async:
tasks.ScheduleUpgradeReminder.enqueue(self.site_config.site, current_day, 2)
mock_apply_async.assert_any_call(
(self.site_config.site.id, serialize(test_day), 2, 0, [], True, None),
retry=False,
)
mock_apply_async.assert_any_call(
(self.site_config.site.id, serialize(test_day), 2, resolvers.UPGRADE_REMINDER_NUM_BINS - 1, [], True, None),
retry=False,
)
self.assertFalse(mock_ace.send.called)
@ddt.data(1, 10, 100)
@patch.object(tasks, 'ace')
@@ -164,11 +162,11 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase):
mock_schedule_bin = Mock()
current_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC)
reminder.UpgradeReminderResolver(
tasks.ScheduleUpgradeReminder.enqueue(
self.site_config.site,
current_day,
async_send_task=mock_schedule_bin,
).send(3)
day_offset=3,
)
self.assertFalse(mock_schedule_bin.called)
self.assertFalse(mock_schedule_bin.apply_async.called)
self.assertFalse(mock_ace.send.called)

View File

@@ -55,71 +55,8 @@ class BinnedSchedulesBaseResolver(PrefixedDebugLoggerMixin, RecipientResolver):
num_bins -- the int number of bins to split the users into
enqueue_config_var -- the string field name of the config variable on ScheduleConfig to check before enqueuing
"""
num_bins = DEFAULT_NUM_BINS
enqueue_config_var = None # define in subclass
def __init__(self, site, current_date, async_send_task, *args, **kwargs):
super(BinnedSchedulesBaseResolver, self).__init__(*args, **kwargs)
self.site = site
self.current_date = current_date.replace(hour=0, minute=0, second=0)
self.async_send_task = async_send_task
def send(self, day_offset, override_recipient_email=None):
if not self.is_enqueue_enabled():
self.log_debug('Message queuing disabled for site %s', self.site.domain)
return
exclude_orgs, org_list = self.get_course_org_filter()
target_date = self.current_date + datetime.timedelta(days=day_offset)
self.log_debug('Target date = %s', target_date.isoformat())
for bin in range(self.num_bins):
task_args = (
self.site.id, serialize(target_date), day_offset, bin, org_list, exclude_orgs, override_recipient_email,
)
self.log_debug('Launching task with args = %r', task_args)
self.async_send_task.apply_async(
task_args,
retry=False,
)
def is_enqueue_enabled(self):
if self.enqueue_config_var:
return getattr(ScheduleConfig.current(self.site), self.enqueue_config_var)
return False
def get_course_org_filter(self):
"""
Given the configuration of sites, get the list of orgs that should be included or excluded from this send.
Returns:
tuple: Returns a tuple (exclude_orgs, org_list). If exclude_orgs is True, then org_list is a list of the
only orgs that should be included in this send. If exclude_orgs is False, then org_list is a list of
orgs that should be excluded from this send. All other orgs should be included.
"""
try:
site_config = SiteConfiguration.objects.get(site_id=self.site.id)
org_list = site_config.get_value('course_org_filter')
exclude_orgs = False
if not org_list:
not_orgs = set()
for other_site_config in SiteConfiguration.objects.all():
other = other_site_config.get_value('course_org_filter')
if not isinstance(other, list):
if other is not None:
not_orgs.add(other)
else:
not_orgs.update(other)
org_list = list(not_orgs)
exclude_orgs = True
elif not isinstance(org_list, list):
org_list = [org_list]
except SiteConfiguration.DoesNotExist:
org_list = None
exclude_orgs = False
finally:
return exclude_orgs, org_list
def send(self, msg_type):
pass
def get_schedules_with_target_date_by_bin_and_orgs(schedule_date_field, current_datetime, target_datetime, bin_num,
num_bins=DEFAULT_NUM_BINS, org_list=None, exclude_orgs=False,
@@ -196,18 +133,6 @@ class RecurringNudge(ScheduleMessageType):
self.name = "recurringnudge_day{}".format(day)
class ScheduleStartResolver(BinnedSchedulesBaseResolver):
"""
Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset``.
"""
num_bins = RECURRING_NUDGE_NUM_BINS
enqueue_config_var = 'enqueue_recurring_nudge'
def __init__(self, *args, **kwargs):
super(ScheduleStartResolver, self).__init__(*args, **kwargs)
self.log_prefix = 'Scheduled Nudge'
def _annotate_for_monitoring(message_type, site, bin_num, target_day_str, day_offset):
# This identifies the type of message being sent, for example: schedules.recurring_nudge3.
set_custom_metric('message_name', '{0}.{1}'.format(
@@ -225,73 +150,81 @@ def _annotate_for_monitoring(message_type, site, bin_num, target_day_str, day_of
set_custom_metric('send_uuid', message_type.uuid)
def recurring_nudge_schedule_bin(
async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
target_datetime = deserialize(target_day_str)
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here
current_datetime = target_datetime - datetime.timedelta(days=day_offset)
msg_type = RecurringNudge(abs(day_offset))
site = Site.objects.get(id=site_id)
class ScheduleStartResolver(BinnedSchedulesBaseResolver):
"""
Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset``.
"""
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset)
def __init__(self, *args, **kwargs):
super(ScheduleStartResolver, self).__init__(*args, **kwargs)
self.log_prefix = 'Scheduled Nudge'
for (user, language, context) in _recurring_nudge_schedules_for_bin(
site,
current_datetime,
target_datetime,
bin_num,
org_list,
exclude_orgs
def recurring_nudge_schedule_bin(
self, async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
target_datetime = deserialize(target_day_str)
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here
current_datetime = target_datetime - datetime.timedelta(days=day_offset)
msg_type = RecurringNudge(abs(day_offset))
site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset)
for (user, language, context) in self._recurring_nudge_schedules_for_bin(
site,
current_datetime,
target_datetime,
bin_num,
org_list,
exclude_orgs
):
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
)
with function_trace('enqueue_send_task'):
async_send_task.apply_async(
(site_id, str(msg)), retry=False)
def _recurring_nudge_schedules_for_bin(self, site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False):
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='start',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=RECURRING_NUDGE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs,
)
with function_trace('enqueue_send_task'):
async_send_task.apply_async(
(site_id, str(msg)), retry=False)
LOG.debug('Recurring Nudge: Query = %r', schedules.query.sql_with_params())
for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user):
user_schedules = list(user_schedules)
course_id_strs = [str(schedule.enrollment.course_id) for schedule in user_schedules]
def _recurring_nudge_schedules_for_bin(site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False):
first_schedule = user_schedules[0]
template_context = get_base_template_context(site)
template_context.update({
'student_name': user.profile.name,
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='start',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=RECURRING_NUDGE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs,
)
'course_name': first_schedule.enrollment.course.display_name,
'course_url': absolute_url(site, reverse('course_root', args=[str(first_schedule.enrollment.course_id)])),
for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user):
user_schedules = list(user_schedules)
course_id_strs = [str(schedule.enrollment.course_id)
for schedule in user_schedules]
# This is used by the bulk email optout policy
'course_ids': course_id_strs,
})
first_schedule = user_schedules[0]
template_context = get_base_template_context(site)
template_context.update({
'student_name': user.profile.name,
# Information for including upsell messaging in template.
_add_upsell_button_information_to_template_context(
user, first_schedule, template_context)
'course_name': first_schedule.enrollment.course.display_name,
'course_url': absolute_url(site, reverse('course_root', args=[str(first_schedule.enrollment.course_id)])),
# This is used by the bulk email optout policy
'course_ids': course_id_strs,
})
# Information for including upsell messaging in template.
_add_upsell_button_information_to_template_context(
user, first_schedule, template_context)
yield (user, first_schedule.enrollment.course.language, template_context)
yield (user, first_schedule.enrollment.course.language, template_context)
def _get_datetime_beginning_of_day(dt):
@@ -308,81 +241,78 @@ class UpgradeReminderResolver(BinnedSchedulesBaseResolver):
"""
Send a message to all users whose verified upgrade deadline is at ``self.current_date`` + ``day_offset``.
"""
num_bins = UPGRADE_REMINDER_NUM_BINS
enqueue_config_var = 'enqueue_upgrade_reminder'
def __init__(self, *args, **kwargs):
super(UpgradeReminderResolver, self).__init__(*args, **kwargs)
self.log_prefix = 'Upgrade Reminder'
def upgrade_reminder_schedule_bin(
async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
target_datetime = deserialize(target_day_str)
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here
current_datetime = target_datetime - datetime.timedelta(days=day_offset)
msg_type = UpgradeReminder()
site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num,target_day_str, day_offset)
for (user, language, context) in _upgrade_reminder_schedules_for_bin(
site,
current_datetime,
target_datetime,
bin_num,
org_list,
exclude_orgs
def upgrade_reminder_schedule_bin(
self, async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
target_datetime = deserialize(target_day_str)
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here
current_datetime = target_datetime - datetime.timedelta(days=day_offset)
msg_type = UpgradeReminder()
site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num,
target_day_str, day_offset)
for (user, language, context) in self._upgrade_reminder_schedules_for_bin(
site,
current_datetime,
target_datetime,
bin_num,
org_list,
exclude_orgs
):
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
)
with function_trace('enqueue_send_task'):
async_send_task.apply_async(
(site_id, str(msg)), retry=False)
def _upgrade_reminder_schedules_for_bin(self, site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False):
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='upgrade_deadline',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=RECURRING_NUDGE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs,
)
with function_trace('enqueue_send_task'):
async_send_task.apply_async(
(site_id, str(msg)), retry=False)
for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user):
user_schedules = list(user_schedules)
course_id_strs = [str(schedule.enrollment.course_id) for schedule in user_schedules]
def _upgrade_reminder_schedules_for_bin(site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False):
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='upgrade_deadline',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=RECURRING_NUDGE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs,
)
first_schedule = user_schedules[0]
template_context = get_base_template_context(site)
template_context.update({
'student_name': user.profile.name,
'course_links': [
{
'url': absolute_url(site, reverse('course_root', args=[str(s.enrollment.course_id)])),
'name': s.enrollment.course.display_name
} for s in user_schedules
],
'first_course_name': first_schedule.enrollment.course.display_name,
'cert_image': absolute_url(site, static('course_experience/images/verified-cert.png')),
for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user):
user_schedules = list(user_schedules)
course_id_strs = [str(schedule.enrollment.course_id) for schedule in user_schedules]
# This is used by the bulk email optout policy
'course_ids': course_id_strs,
})
first_schedule = user_schedules[0]
template_context = get_base_template_context(site)
template_context.update({
'student_name': user.profile.name,
'course_links': [
{
'url': absolute_url(site, reverse('course_root', args=[str(s.enrollment.course_id)])),
'name': s.enrollment.course.display_name
} for s in user_schedules
],
'first_course_name': first_schedule.enrollment.course.display_name,
'cert_image': absolute_url(site, static('course_experience/images/verified-cert.png')),
_add_upsell_button_information_to_template_context(user, first_schedule, template_context)
# This is used by the bulk email optout policy
'course_ids': course_id_strs,
})
_add_upsell_button_information_to_template_context(user, first_schedule, template_context)
yield (user, first_schedule.enrollment.course.language, template_context)
yield (user, first_schedule.enrollment.course.language, template_context)
def _add_upsell_button_information_to_template_context(user, schedule, template_context):
@@ -424,87 +354,83 @@ class CourseUpdateResolver(BinnedSchedulesBaseResolver):
Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset`` and the
course has updates.
"""
num_bins = COURSE_UPDATE_NUM_BINS
enqueue_config_var = 'enqueue_course_update'
def __init__(self, *args, **kwargs):
super(CourseUpdateResolver, self).__init__(*args, **kwargs)
self.log_prefix = 'Course Update'
def course_update_schedule_bin(
async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
target_datetime = deserialize(target_day_str)
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here
current_datetime = target_datetime - datetime.timedelta(days=day_offset)
msg_type = CourseUpdate()
site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset)
for (user, language, context) in _course_update_schedules_for_bin(
site,
current_datetime,
target_datetime,
day_offset,
bin_num,
org_list,
exclude_orgs
def course_update_schedule_bin(
self, async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
target_datetime = deserialize(target_day_str)
# TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here
current_datetime = target_datetime - datetime.timedelta(days=day_offset)
msg_type = CourseUpdate()
site = Site.objects.get(id=site_id)
_annotate_for_monitoring(msg_type, site, bin_num,
target_day_str, day_offset)
for (user, language, context) in self._course_update_schedules_for_bin(
site,
current_datetime,
target_datetime,
day_offset,
bin_num,
org_list,
exclude_orgs
):
msg = msg_type.personalize(
Recipient(
user.username,
override_recipient_email or user.email,
),
language,
context,
)
with function_trace('enqueue_send_task'):
async_send_task.apply_async(
(site_id, str(msg)), retry=False)
def _course_update_schedules_for_bin(self, site, current_datetime, target_datetime, day_offset, bin_num, org_list,
exclude_orgs=False):
week_num = abs(day_offset) / 7
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='start',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=COURSE_UPDATE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs,
order_by='enrollment__course',
)
with function_trace('enqueue_send_task'):
async_send_task.apply_async(
(site_id, str(msg)), retry=False)
LOG.debug('Course Update: Query = %r', schedules.query.sql_with_params())
for schedule in schedules:
enrollment = schedule.enrollment
try:
week_summary = get_course_week_summary(
enrollment.course_id, week_num)
except CourseUpdateDoesNotExist:
continue
user = enrollment.user
course_id_str = str(enrollment.course_id)
def _course_update_schedules_for_bin(site, current_datetime, target_datetime, day_offset, bin_num, org_list,
exclude_orgs=False):
week_num = abs(day_offset) / 7
schedules = get_schedules_with_target_date_by_bin_and_orgs(
schedule_date_field='start',
current_datetime=current_datetime,
target_datetime=target_datetime,
bin_num=bin_num,
num_bins=COURSE_UPDATE_NUM_BINS,
org_list=org_list,
exclude_orgs=exclude_orgs,
order_by='enrollment__course',
)
template_context = get_base_template_context(site)
template_context.update({
'student_name': user.profile.name,
'user_personal_address': user.profile.name if user.profile.name else user.username,
'course_name': schedule.enrollment.course.display_name,
'course_url': absolute_url(site, reverse('course_root', args=[str(schedule.enrollment.course_id)])),
'week_num': week_num,
'week_summary': week_summary,
for schedule in schedules:
enrollment = schedule.enrollment
try:
week_summary = get_course_week_summary(
enrollment.course_id, week_num)
except CourseUpdateDoesNotExist:
continue
# This is used by the bulk email optout policy
'course_ids': [course_id_str],
})
user = enrollment.user
course_id_str = str(enrollment.course_id)
template_context = get_base_template_context(site)
template_context.update({
'student_name': user.profile.name,
'user_personal_address': user.profile.name if user.profile.name else user.username,
'course_name': schedule.enrollment.course.display_name,
'course_url': absolute_url(site, reverse('course_root', args=[str(schedule.enrollment.course_id)])),
'week_num': week_num,
'week_summary': week_summary,
# This is used by the bulk email optout policy
'course_ids': [course_id_str],
})
yield (user, schedule.enrollment.course.language, template_context)
yield (user, schedule.enrollment.course.language, template_context)
@request_cached

View File

@@ -13,13 +13,14 @@ from django.utils.formats import dateformat, get_format
from edx_ace import ace
from edx_ace.message import Message
from edx_ace.recipient import Recipient
from edx_ace.utils.date import deserialize
from edx_ace.utils.date import deserialize, serialize
from opaque_keys.edx.keys import CourseKey
from openedx.core.djangoapps.monitoring_utils import set_custom_metric
from openedx.core.djangoapps.schedules.models import Schedule, ScheduleConfig
from openedx.core.djangoapps.schedules import resolvers
from openedx.core.djangoapps.site_configuration.models import SiteConfiguration
LOG = logging.getLogger(__name__)
@@ -68,12 +69,91 @@ def _recurring_nudge_schedule_send(site_id, msg_str):
class ScheduleMessageBaseTask(Task):
ignore_result = True
routing_key = ROUTING_KEY
num_bins = resolvers.DEFAULT_NUM_BINS
enqueue_config_var = None # define in subclass
log_prefix = None
@classmethod
def log_debug(cls, message, *args, **kwargs):
LOG.debug(cls.log_prefix + ': ' + message, *args, **kwargs)
@classmethod
def enqueue(cls, site, current_date, day_offset, override_recipient_email=None):
current_date = current_date.replace(hour=0, minute=0, second=0)
if not cls.is_enqueue_enabled(site):
cls.log_debug(
'Message queuing disabled for site %s', site.domain)
return
exclude_orgs, org_list = cls.get_course_org_filter(site)
target_date = current_date + datetime.timedelta(days=day_offset)
cls.log_debug('Target date = %s', target_date.isoformat())
for bin in range(cls.num_bins):
task_args = (
site.id,
serialize(target_date),
day_offset,
bin,
org_list,
exclude_orgs,
override_recipient_email,
)
cls.log_debug('Launching task with args = %r', task_args)
cls.apply_async(
task_args,
retry=False,
)
@classmethod
def is_enqueue_enabled(cls, site):
if cls.enqueue_config_var:
return getattr(ScheduleConfig.current(site), cls.enqueue_config_var)
return False
@classmethod
def get_course_org_filter(cls, site):
"""
Given the configuration of sites, get the list of orgs that should be included or excluded from this send.
Returns:
tuple: Returns a tuple (exclude_orgs, org_list). If exclude_orgs is True, then org_list is a list of the
only orgs that should be included in this send. If exclude_orgs is False, then org_list is a list of
orgs that should be excluded from this send. All other orgs should be included.
"""
try:
site_config = SiteConfiguration.objects.get(site_id=site.id)
org_list = site_config.get_value('course_org_filter')
exclude_orgs = False
if not org_list:
not_orgs = set()
for other_site_config in SiteConfiguration.objects.all():
other = other_site_config.get_value('course_org_filter')
if not isinstance(other, list):
if other is not None:
not_orgs.add(other)
else:
not_orgs.update(other)
org_list = list(not_orgs)
exclude_orgs = True
elif not isinstance(org_list, list):
org_list = [org_list]
except SiteConfiguration.DoesNotExist:
org_list = None
exclude_orgs = False
finally:
return exclude_orgs, org_list
class ScheduleRecurringNudge(ScheduleMessageBaseTask):
num_bins = resolvers.RECURRING_NUDGE_NUM_BINS
enqueue_config_var = 'enqueue_recurring_nudge'
log_prefix = 'Scheduled Nudge'
def run(
self, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
return resolvers.recurring_nudge_schedule_bin(
return resolvers.ScheduleStartResolver().recurring_nudge_schedule_bin(
_recurring_nudge_schedule_send,
site_id,
target_day_str,
@@ -86,10 +166,15 @@ class ScheduleRecurringNudge(ScheduleMessageBaseTask):
class ScheduleUpgradeReminder(ScheduleMessageBaseTask):
num_bins = resolvers.UPGRADE_REMINDER_NUM_BINS
enqueue_config_var = 'enqueue_upgrade_reminder'
log_prefix = 'Course Update'
def run(
self, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
return resolvers.upgrade_reminder_schedule_bin(
return resolvers.UpgradeReminderResolver().upgrade_reminder_schedule_bin(
_upgrade_reminder_schedule_send,
site_id,
target_day_str,
@@ -111,10 +196,14 @@ def _upgrade_reminder_schedule_send(site_id, msg_str):
class ScheduleCourseUpdate(ScheduleMessageBaseTask):
num_bins = resolvers.COURSE_UPDATE_NUM_BINS
enqueue_config_var = 'enqueue_course_update'
log_prefix = 'Course Update'
def run(
self, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None,
):
return resolvers.course_update_schedule_bin(
return resolvers.CourseUpdateResolver().course_update_schedule_bin(
_course_update_schedule_send,
site_id,
target_day_str,

View File

@@ -1,110 +0,0 @@
import datetime
from unittest import skipUnless
import ddt
from django.conf import settings
from mock import patch
from openedx.core.djangoapps.schedules.resolvers import BinnedSchedulesBaseResolver
from openedx.core.djangoapps.schedules.resolvers import DEFAULT_NUM_BINS
from openedx.core.djangoapps.schedules.tests.factories import ScheduleConfigFactory
from openedx.core.djangoapps.site_configuration.tests.factories import SiteConfigurationFactory, SiteFactory
from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_unless_lms
@ddt.ddt
@skip_unless_lms
@skipUnless('openedx.core.djangoapps.schedules.apps.SchedulesConfig' in settings.INSTALLED_APPS,
"Can't test schedules if the app isn't installed")
class TestBinnedSchedulesBaseResolver(CacheIsolationTestCase):
def setUp(self):
super(TestBinnedSchedulesBaseResolver, self).setUp()
self.site = SiteFactory.create()
self.site_config = SiteConfigurationFactory.create(site=self.site)
self.schedule_config = ScheduleConfigFactory.create(site=self.site)
def setup_resolver(self, site=None, current_date=None, async_send_task=None):
if site is None:
site = self.site
if current_date is None:
current_date = datetime.datetime.now()
resolver = BinnedSchedulesBaseResolver(self.site, current_date, async_send_task)
return resolver
def test_init_site(self):
resolver = self.setup_resolver()
assert resolver.site == self.site
def test_init_current_date(self):
current_time = datetime.datetime.now()
resolver = self.setup_resolver(current_date=current_time)
current_date = current_time.replace(hour=0, minute=0, second=0)
assert resolver.current_date == current_date
def test_init_async_send_task(self):
resolver = self.setup_resolver()
assert resolver.async_send_task is None
def test_init_num_bins(self):
resolver = self.setup_resolver()
assert resolver.num_bins == DEFAULT_NUM_BINS
def test_send_enqueue_disabled(self):
resolver = self.setup_resolver()
resolver.is_enqueue_enabled = lambda: False
with patch.object(resolver, 'async_send_task') as send:
with patch.object(resolver, 'log_debug') as log_debug:
resolver.send(day_offset=2)
log_debug.assert_called_once_with('Message queuing disabled for site %s', self.site.domain)
send.apply_async.assert_not_called()
@ddt.data(0, 2, -3)
def test_send_enqueue_enabled(self, day_offset):
resolver = self.setup_resolver()
resolver.is_enqueue_enabled = lambda: True
resolver.get_course_org_filter = lambda: (False, None)
with patch.object(resolver, 'async_send_task') as send:
with patch.object(resolver, 'log_debug') as log_debug:
resolver.send(day_offset=day_offset)
target_date = resolver.current_date + datetime.timedelta(day_offset)
log_debug.assert_any_call('Target date = %s', target_date.isoformat())
assert send.apply_async.call_count == DEFAULT_NUM_BINS
@ddt.data(True, False)
def test_is_enqueue_enabled(self, enabled):
resolver = self.setup_resolver()
resolver.enqueue_config_var = 'enqueue_recurring_nudge'
self.schedule_config.enqueue_recurring_nudge = enabled
self.schedule_config.save()
assert resolver.is_enqueue_enabled() == enabled
@ddt.unpack
@ddt.data(
('course1', ['course1']),
(['course1', 'course2'], ['course1', 'course2'])
)
def test_get_course_org_filter_include(self, course_org_filter, expected_org_list):
resolver = self.setup_resolver()
self.site_config.values['course_org_filter'] = course_org_filter
self.site_config.save()
exclude_orgs, org_list = resolver.get_course_org_filter()
assert not exclude_orgs
assert org_list == expected_org_list
@ddt.unpack
@ddt.data(
(None, []),
('course1', [u'course1']),
(['course1', 'course2'], [u'course1', u'course2'])
)
def test_get_course_org_filter_exclude(self, course_org_filter, expected_org_list):
resolver = self.setup_resolver()
self.other_site = SiteFactory.create()
self.other_site_config = SiteConfigurationFactory.create(
site=self.other_site,
values={'course_org_filter': course_org_filter},
)
exclude_orgs, org_list = resolver.get_course_org_filter()
assert exclude_orgs
self.assertItemsEqual(org_list, expected_org_list)

View File

@@ -0,0 +1,101 @@
import datetime
from unittest import skipUnless
import ddt
from django.conf import settings
from mock import patch, DEFAULT, Mock
from openedx.core.djangoapps.schedules.tasks import ScheduleMessageBaseTask
from openedx.core.djangoapps.schedules.resolvers import DEFAULT_NUM_BINS
from openedx.core.djangoapps.schedules.tests.factories import ScheduleConfigFactory
from openedx.core.djangoapps.site_configuration.tests.factories import SiteConfigurationFactory, SiteFactory
from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_unless_lms
@ddt.ddt
@skip_unless_lms
@skipUnless('openedx.core.djangoapps.schedules.apps.SchedulesConfig' in settings.INSTALLED_APPS,
"Can't test schedules if the app isn't installed")
class TestScheduleMessageBaseTask(CacheIsolationTestCase):
def setUp(self):
super(TestScheduleMessageBaseTask, self).setUp()
self.site = SiteFactory.create()
self.site_config = SiteConfigurationFactory.create(site=self.site)
self.schedule_config = ScheduleConfigFactory.create(site=self.site)
self.basetask = ScheduleMessageBaseTask
def test_send_enqueue_disabled(self):
send = Mock(name='async_send_task')
with patch.multiple(
self.basetask,
is_enqueue_enabled=Mock(return_value=False),
log_debug=DEFAULT,
run=send,
) as patches:
self.basetask.enqueue(
site=self.site,
current_date=datetime.datetime.now(),
day_offset=2
)
patches['log_debug'].assert_called_once_with(
'Message queuing disabled for site %s', self.site.domain)
send.apply_async.assert_not_called()
@ddt.data(0, 2, -3)
def test_send_enqueue_enabled(self, day_offset):
send = Mock(name='async_send_task')
current_date = datetime.datetime.now()
with patch.multiple(
self.basetask,
is_enqueue_enabled=Mock(return_value=True),
get_course_org_filter=Mock(return_value=(False, None)),
log_debug=DEFAULT,
run=send,
) as patches:
self.basetask.enqueue(
site=self.site,
current_date=current_date,
day_offset=day_offset
)
target_date = current_date.replace(hour=0, minute=0, second=0) + \
datetime.timedelta(day_offset)
print(patches['log_debug'].mock_calls)
patches['log_debug'].assert_any_call(
'Target date = %s', target_date.isoformat())
assert send.call_count == DEFAULT_NUM_BINS
@ddt.data(True, False)
def test_is_enqueue_enabled(self, enabled):
with patch.object(self.basetask, 'enqueue_config_var', 'enqueue_recurring_nudge'):
self.schedule_config.enqueue_recurring_nudge = enabled
self.schedule_config.save()
assert self.basetask.is_enqueue_enabled(self.site) == enabled
@ddt.unpack
@ddt.data(
('course1', ['course1']),
(['course1', 'course2'], ['course1', 'course2'])
)
def test_get_course_org_filter_include(self, course_org_filter, expected_org_list):
self.site_config.values['course_org_filter'] = course_org_filter
self.site_config.save()
exclude_orgs, org_list = self.basetask.get_course_org_filter(self.site)
assert not exclude_orgs
assert org_list == expected_org_list
@ddt.unpack
@ddt.data(
(None, []),
('course1', [u'course1']),
(['course1', 'course2'], [u'course1', u'course2'])
)
def test_get_course_org_filter_exclude(self, course_org_filter, expected_org_list):
self.other_site = SiteFactory.create()
self.other_site_config = SiteConfigurationFactory.create(
site=self.other_site,
values={'course_org_filter': course_org_filter},
)
exclude_orgs, org_list = self.basetask.get_course_org_filter(self.site)
assert exclude_orgs
self.assertItemsEqual(org_list, expected_org_list)