From d222b2d71849cd36577d86ca4636f6cc74053b38 Mon Sep 17 00:00:00 2001 From: Calen Pennington Date: Wed, 18 Oct 2017 14:01:40 -0400 Subject: [PATCH] Move bin-task enqueuing into a classmethod (rather than having it on the RecipientResolvers --- .../schedules/management/commands/__init__.py | 20 +- .../management/commands/send_course_update.py | 6 +- .../commands/send_recurring_nudge.py | 6 +- .../commands/send_upgrade_reminder.py | 6 +- .../management/commands/tests/test_base.py | 27 +- .../tests/test_send_recurring_nudge.py | 47 +- .../tests/test_send_upgrade_reminder.py | 40 +- .../core/djangoapps/schedules/resolvers.py | 460 ++++++++---------- openedx/core/djangoapps/schedules/tasks.py | 97 +++- .../schedules/tests/test_resolvers.py | 110 ----- .../djangoapps/schedules/tests/test_tasks.py | 101 ++++ 11 files changed, 456 insertions(+), 464 deletions(-) delete mode 100644 openedx/core/djangoapps/schedules/tests/test_resolvers.py create mode 100644 openedx/core/djangoapps/schedules/tests/test_tasks.py diff --git a/openedx/core/djangoapps/schedules/management/commands/__init__.py b/openedx/core/djangoapps/schedules/management/commands/__init__.py index 258fdc9b35..0aaeefd701 100644 --- a/openedx/core/djangoapps/schedules/management/commands/__init__.py +++ b/openedx/core/djangoapps/schedules/management/commands/__init__.py @@ -8,7 +8,6 @@ from openedx.core.djangoapps.schedules.utils import PrefixedDebugLoggerMixin class SendEmailBaseCommand(PrefixedDebugLoggerMixin, BaseCommand): - resolver_class = None # define in subclass async_send_task = None # define in subclass def add_arguments(self, parser): @@ -24,20 +23,27 @@ class SendEmailBaseCommand(PrefixedDebugLoggerMixin, BaseCommand): parser.add_argument('site_domain_name') def handle(self, *args, **options): - resolver = self.make_resolver(*args, **options) - self.send_emails(resolver, *args, **options) + self.log_debug('Args = %r', options) - def make_resolver(self, *args, **options): current_date = datetime.datetime( *[int(x) for x in options['date'].split('-')], tzinfo=pytz.UTC ) - self.log_debug('Args = %r', options) self.log_debug('Current date = %s', current_date.isoformat()) site = Site.objects.get(domain__iexact=options['site_domain_name']) self.log_debug('Running for site %s', site.domain) - return self.resolver_class(site, current_date, async_send_task=self.async_send_task) - def send_emails(self, resolver, *args, **options): + override_recipient_email = options.get('override_recipient_email') + self.send_emails(site, current_date, override_recipient_email) + + def enqueue(self, day_offset, site, current_date, override_recipient_email=None): + self.async_send_task.enqueue( + site, + current_date, + day_offset, + override_recipient_email, + ) + + def send_emails(self, *args, **kwargs): pass # define in subclass diff --git a/openedx/core/djangoapps/schedules/management/commands/send_course_update.py b/openedx/core/djangoapps/schedules/management/commands/send_course_update.py index b1e38599fc..2bfa393e29 100644 --- a/openedx/core/djangoapps/schedules/management/commands/send_course_update.py +++ b/openedx/core/djangoapps/schedules/management/commands/send_course_update.py @@ -1,16 +1,14 @@ from openedx.core.djangoapps.schedules.management.commands import SendEmailBaseCommand -from openedx.core.djangoapps.schedules.resolvers import CourseUpdateResolver from openedx.core.djangoapps.schedules.tasks import ScheduleCourseUpdate class Command(SendEmailBaseCommand): - resolver_class = CourseUpdateResolver async_send_task = ScheduleCourseUpdate def __init__(self, *args, **kwargs): super(Command, self).__init__(*args, **kwargs) self.log_prefix = 'Upgrade Reminder' - def send_emails(self, resolver, *args, **options): + def send_emails(self, *args, **kwargs): for day_offset in xrange(-7, -77, -7): - resolver.send(day_offset, options.get('override_recipient_email')) + self.enqueue(day_offset, *args, **kwargs) diff --git a/openedx/core/djangoapps/schedules/management/commands/send_recurring_nudge.py b/openedx/core/djangoapps/schedules/management/commands/send_recurring_nudge.py index 0202048695..b5732e20b9 100644 --- a/openedx/core/djangoapps/schedules/management/commands/send_recurring_nudge.py +++ b/openedx/core/djangoapps/schedules/management/commands/send_recurring_nudge.py @@ -1,16 +1,14 @@ from openedx.core.djangoapps.schedules.management.commands import SendEmailBaseCommand -from openedx.core.djangoapps.schedules.resolvers import ScheduleStartResolver from openedx.core.djangoapps.schedules.tasks import ScheduleRecurringNudge class Command(SendEmailBaseCommand): - resolver_class = ScheduleStartResolver async_send_task = ScheduleRecurringNudge def __init__(self, *args, **kwargs): super(Command, self).__init__(*args, **kwargs) self.log_prefix = 'Scheduled Nudge' - def send_emails(self, resolver, *args, **options): + def send_emails(self, *args, **kwargs): for day_offset in (-3, -10): - resolver.send(day_offset, options.get('override_recipient_email')) + self.enqueue(day_offset, *args, **kwargs) diff --git a/openedx/core/djangoapps/schedules/management/commands/send_upgrade_reminder.py b/openedx/core/djangoapps/schedules/management/commands/send_upgrade_reminder.py index fbfa0fb1e6..58a6b46a96 100644 --- a/openedx/core/djangoapps/schedules/management/commands/send_upgrade_reminder.py +++ b/openedx/core/djangoapps/schedules/management/commands/send_upgrade_reminder.py @@ -1,15 +1,13 @@ from openedx.core.djangoapps.schedules.management.commands import SendEmailBaseCommand -from openedx.core.djangoapps.schedules.resolvers import UpgradeReminderResolver from openedx.core.djangoapps.schedules.tasks import ScheduleUpgradeReminder class Command(SendEmailBaseCommand): - resolver_class = UpgradeReminderResolver async_send_task = ScheduleUpgradeReminder def __init__(self, *args, **kwargs): super(Command, self).__init__(*args, **kwargs) self.log_prefix = 'Upgrade Reminder' - def send_emails(self, resolver, *args, **options): - resolver.send(2, options.get('override_recipient_email')) + def send_emails(self, *args, **kwargs): + self.enqueue(2, *args, **kwargs) diff --git a/openedx/core/djangoapps/schedules/management/commands/tests/test_base.py b/openedx/core/djangoapps/schedules/management/commands/tests/test_base.py index c69e89aa3e..3cb6ebc59f 100644 --- a/openedx/core/djangoapps/schedules/management/commands/tests/test_base.py +++ b/openedx/core/djangoapps/schedules/management/commands/tests/test_base.py @@ -18,24 +18,13 @@ from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_un class TestSendEmailBaseCommand(CacheIsolationTestCase): def setUp(self): self.command = SendEmailBaseCommand() - - def test_init_resolver_class(self): - assert self.command.resolver_class is None - - def test_make_resolver(self): - with patch.object(self.command, 'resolver_class') as resolver_class: - example_site = SiteFactory(domain='example.com') - self.command.make_resolver(site_domain_name='example.com', date='2017-09-29') - resolver_class.assert_called_once_with( - example_site, - datetime.datetime(2017, 9, 29, tzinfo=pytz.UTC), - async_send_task=None, - ) + self.site = SiteFactory() def test_handle(self): - with patch.object(self.command, 'make_resolver') as make_resolver: - make_resolver.return_value = 'resolver' - with patch.object(self.command, 'send_emails') as send_emails: - self.command.handle(date='2017-09-29') - make_resolver.assert_called_once_with(date='2017-09-29') - send_emails.assert_called_once_with('resolver', date='2017-09-29') + with patch.object(self.command, 'send_emails') as send_emails: + self.command.handle(site_domain_name=self.site.domain, date='2017-09-29') + send_emails.assert_called_once_with( + self.site, + datetime.datetime(2017, 9, 29, tzinfo=pytz.UTC), + None + ) diff --git a/openedx/core/djangoapps/schedules/management/commands/tests/test_send_recurring_nudge.py b/openedx/core/djangoapps/schedules/management/commands/tests/test_send_recurring_nudge.py index 3fae1c53ac..082d8f61c2 100644 --- a/openedx/core/djangoapps/schedules/management/commands/tests/test_send_recurring_nudge.py +++ b/openedx/core/djangoapps/schedules/management/commands/tests/test_send_recurring_nudge.py @@ -60,35 +60,33 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): DynamicUpgradeDeadlineConfiguration.objects.create(enabled=True) - @patch.object(nudge.Command, 'resolver_class') - def test_handle(self, mock_resolver): + @patch.object(nudge.Command, 'async_send_task') + def test_handle(self, mock_send): test_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC) nudge.Command().handle(date='2017-08-01', site_domain_name=self.site_config.site.domain) - mock_resolver.assert_called_with( - self.site_config.site, - test_day, - async_send_task=nudge.Command.async_send_task, - ) - for day in (-3, -10): - mock_resolver().send.assert_any_call(day, None) + mock_send.enqueue.assert_any_call( + self.site_config.site, + test_day, + day, + None + ) @patch.object(tasks, 'ace') def test_resolver_send(self, mock_ace): current_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC) - mock_schedule_bin = Mock() - nudge.ScheduleStartResolver(self.site_config.site, current_day, mock_schedule_bin).send(-3) - test_day = current_day + datetime.timedelta(days=-3) - self.assertFalse(mock_schedule_bin.called) - mock_schedule_bin.apply_async.assert_any_call( - (self.site_config.site.id, serialize(test_day), -3, 0, [], True, None), - retry=False, - ) - mock_schedule_bin.apply_async.assert_any_call( - (self.site_config.site.id, serialize(test_day), -3, resolvers.RECURRING_NUDGE_NUM_BINS - 1, [], True, None), - retry=False, - ) - self.assertFalse(mock_ace.send.called) + with patch.object(tasks.ScheduleRecurringNudge, 'apply_async') as mock_apply_async: + tasks.ScheduleRecurringNudge.enqueue(self.site_config.site, current_day, -3) + test_day = current_day + datetime.timedelta(days=-3) + mock_apply_async.assert_any_call( + (self.site_config.site.id, serialize(test_day), -3, 0, [], True, None), + retry=False, + ) + mock_apply_async.assert_any_call( + (self.site_config.site.id, serialize(test_day), -3, resolvers.RECURRING_NUDGE_NUM_BINS - 1, [], True, None), + retry=False, + ) + self.assertFalse(mock_ace.send.called) @ddt.data(1, 10, 100) @patch.object(tasks, 'ace') @@ -180,11 +178,12 @@ class TestSendRecurringNudge(FilteredQueryCountMixin, CacheIsolationTestCase): mock_schedule_bin = Mock() current_datetime = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC) - nudge.ScheduleStartResolver( + tasks.ScheduleRecurringNudge.enqueue( self.site_config.site, current_datetime, mock_schedule_bin, - ).send(3) + 3 + ) self.assertFalse(mock_schedule_bin.called) self.assertFalse(mock_schedule_bin.apply_async.called) self.assertFalse(mock_ace.send.called) diff --git a/openedx/core/djangoapps/schedules/management/commands/tests/test_send_upgrade_reminder.py b/openedx/core/djangoapps/schedules/management/commands/tests/test_send_upgrade_reminder.py index 99cf2829c1..f0c1875aaa 100644 --- a/openedx/core/djangoapps/schedules/management/commands/tests/test_send_upgrade_reminder.py +++ b/openedx/core/djangoapps/schedules/management/commands/tests/test_send_upgrade_reminder.py @@ -63,36 +63,34 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): DynamicUpgradeDeadlineConfiguration.objects.create(enabled=True) - @patch.object(reminder.Command, 'resolver_class') - def test_handle(self, mock_resolver): + @patch.object(reminder.Command, 'async_send_task') + def test_handle(self, mock_send): test_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC) reminder.Command().handle(date='2017-08-01', site_domain_name=self.site_config.site.domain) - mock_resolver.assert_called_with( + mock_send.enqueue.assert_called_with( self.site_config.site, test_day, - async_send_task=reminder.Command.async_send_task, + 2, + None ) - mock_resolver().send.assert_any_call(2, None) - @patch.object(tasks, 'ace') def test_resolver_send(self, mock_ace): current_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC) test_day = current_day + datetime.timedelta(days=2) ScheduleFactory.create(upgrade_deadline=datetime.datetime(2017, 8, 3, 15, 34, 30, tzinfo=pytz.UTC)) - mock_schedule_bin = Mock() - reminder.UpgradeReminderResolver(self.site_config.site, current_day, mock_schedule_bin).send(2) - self.assertFalse(mock_schedule_bin.called) - mock_schedule_bin.apply_async.assert_any_call( - (self.site_config.site.id, serialize(test_day), 2, 0, [], True, None), - retry=False, - ) - mock_schedule_bin.apply_async.assert_any_call( - (self.site_config.site.id, serialize(test_day), 2, resolvers.UPGRADE_REMINDER_NUM_BINS - 1, [], True, None), - retry=False, - ) - self.assertFalse(mock_ace.send.called) + with patch.object(tasks.ScheduleUpgradeReminder, 'apply_async') as mock_apply_async: + tasks.ScheduleUpgradeReminder.enqueue(self.site_config.site, current_day, 2) + mock_apply_async.assert_any_call( + (self.site_config.site.id, serialize(test_day), 2, 0, [], True, None), + retry=False, + ) + mock_apply_async.assert_any_call( + (self.site_config.site.id, serialize(test_day), 2, resolvers.UPGRADE_REMINDER_NUM_BINS - 1, [], True, None), + retry=False, + ) + self.assertFalse(mock_ace.send.called) @ddt.data(1, 10, 100) @patch.object(tasks, 'ace') @@ -164,11 +162,11 @@ class TestUpgradeReminder(FilteredQueryCountMixin, CacheIsolationTestCase): mock_schedule_bin = Mock() current_day = datetime.datetime(2017, 8, 1, tzinfo=pytz.UTC) - reminder.UpgradeReminderResolver( + tasks.ScheduleUpgradeReminder.enqueue( self.site_config.site, current_day, - async_send_task=mock_schedule_bin, - ).send(3) + day_offset=3, + ) self.assertFalse(mock_schedule_bin.called) self.assertFalse(mock_schedule_bin.apply_async.called) self.assertFalse(mock_ace.send.called) diff --git a/openedx/core/djangoapps/schedules/resolvers.py b/openedx/core/djangoapps/schedules/resolvers.py index 21203fc8a2..b8c47cb6fd 100644 --- a/openedx/core/djangoapps/schedules/resolvers.py +++ b/openedx/core/djangoapps/schedules/resolvers.py @@ -55,71 +55,8 @@ class BinnedSchedulesBaseResolver(PrefixedDebugLoggerMixin, RecipientResolver): num_bins -- the int number of bins to split the users into enqueue_config_var -- the string field name of the config variable on ScheduleConfig to check before enqueuing """ - num_bins = DEFAULT_NUM_BINS - enqueue_config_var = None # define in subclass - - def __init__(self, site, current_date, async_send_task, *args, **kwargs): - super(BinnedSchedulesBaseResolver, self).__init__(*args, **kwargs) - self.site = site - self.current_date = current_date.replace(hour=0, minute=0, second=0) - self.async_send_task = async_send_task - - def send(self, day_offset, override_recipient_email=None): - if not self.is_enqueue_enabled(): - self.log_debug('Message queuing disabled for site %s', self.site.domain) - return - - exclude_orgs, org_list = self.get_course_org_filter() - - target_date = self.current_date + datetime.timedelta(days=day_offset) - self.log_debug('Target date = %s', target_date.isoformat()) - for bin in range(self.num_bins): - task_args = ( - self.site.id, serialize(target_date), day_offset, bin, org_list, exclude_orgs, override_recipient_email, - ) - self.log_debug('Launching task with args = %r', task_args) - self.async_send_task.apply_async( - task_args, - retry=False, - ) - - def is_enqueue_enabled(self): - if self.enqueue_config_var: - return getattr(ScheduleConfig.current(self.site), self.enqueue_config_var) - return False - - def get_course_org_filter(self): - """ - Given the configuration of sites, get the list of orgs that should be included or excluded from this send. - - Returns: - tuple: Returns a tuple (exclude_orgs, org_list). If exclude_orgs is True, then org_list is a list of the - only orgs that should be included in this send. If exclude_orgs is False, then org_list is a list of - orgs that should be excluded from this send. All other orgs should be included. - """ - try: - site_config = SiteConfiguration.objects.get(site_id=self.site.id) - org_list = site_config.get_value('course_org_filter') - exclude_orgs = False - if not org_list: - not_orgs = set() - for other_site_config in SiteConfiguration.objects.all(): - other = other_site_config.get_value('course_org_filter') - if not isinstance(other, list): - if other is not None: - not_orgs.add(other) - else: - not_orgs.update(other) - org_list = list(not_orgs) - exclude_orgs = True - elif not isinstance(org_list, list): - org_list = [org_list] - except SiteConfiguration.DoesNotExist: - org_list = None - exclude_orgs = False - finally: - return exclude_orgs, org_list - + def send(self, msg_type): + pass def get_schedules_with_target_date_by_bin_and_orgs(schedule_date_field, current_datetime, target_datetime, bin_num, num_bins=DEFAULT_NUM_BINS, org_list=None, exclude_orgs=False, @@ -196,18 +133,6 @@ class RecurringNudge(ScheduleMessageType): self.name = "recurringnudge_day{}".format(day) -class ScheduleStartResolver(BinnedSchedulesBaseResolver): - """ - Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset``. - """ - num_bins = RECURRING_NUDGE_NUM_BINS - enqueue_config_var = 'enqueue_recurring_nudge' - - def __init__(self, *args, **kwargs): - super(ScheduleStartResolver, self).__init__(*args, **kwargs) - self.log_prefix = 'Scheduled Nudge' - - def _annotate_for_monitoring(message_type, site, bin_num, target_day_str, day_offset): # This identifies the type of message being sent, for example: schedules.recurring_nudge3. set_custom_metric('message_name', '{0}.{1}'.format( @@ -225,73 +150,81 @@ def _annotate_for_monitoring(message_type, site, bin_num, target_day_str, day_of set_custom_metric('send_uuid', message_type.uuid) -def recurring_nudge_schedule_bin( - async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, -): - target_datetime = deserialize(target_day_str) - # TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here - current_datetime = target_datetime - datetime.timedelta(days=day_offset) - msg_type = RecurringNudge(abs(day_offset)) - site = Site.objects.get(id=site_id) +class ScheduleStartResolver(BinnedSchedulesBaseResolver): + """ + Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset``. + """ - _annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset) + def __init__(self, *args, **kwargs): + super(ScheduleStartResolver, self).__init__(*args, **kwargs) + self.log_prefix = 'Scheduled Nudge' - for (user, language, context) in _recurring_nudge_schedules_for_bin( - site, - current_datetime, - target_datetime, - bin_num, - org_list, - exclude_orgs + def recurring_nudge_schedule_bin( + self, async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, ): - msg = msg_type.personalize( - Recipient( - user.username, - override_recipient_email or user.email, - ), - language, - context, + target_datetime = deserialize(target_day_str) + # TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here + current_datetime = target_datetime - datetime.timedelta(days=day_offset) + msg_type = RecurringNudge(abs(day_offset)) + site = Site.objects.get(id=site_id) + + _annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset) + + for (user, language, context) in self._recurring_nudge_schedules_for_bin( + site, + current_datetime, + target_datetime, + bin_num, + org_list, + exclude_orgs + ): + msg = msg_type.personalize( + Recipient( + user.username, + override_recipient_email or user.email, + ), + language, + context, + ) + with function_trace('enqueue_send_task'): + async_send_task.apply_async( + (site_id, str(msg)), retry=False) + + + def _recurring_nudge_schedules_for_bin(self, site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False): + schedules = get_schedules_with_target_date_by_bin_and_orgs( + schedule_date_field='start', + current_datetime=current_datetime, + target_datetime=target_datetime, + bin_num=bin_num, + num_bins=RECURRING_NUDGE_NUM_BINS, + org_list=org_list, + exclude_orgs=exclude_orgs, ) - with function_trace('enqueue_send_task'): - async_send_task.apply_async( - (site_id, str(msg)), retry=False) + LOG.debug('Recurring Nudge: Query = %r', schedules.query.sql_with_params()) + for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user): + user_schedules = list(user_schedules) + course_id_strs = [str(schedule.enrollment.course_id) for schedule in user_schedules] -def _recurring_nudge_schedules_for_bin(site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False): + first_schedule = user_schedules[0] + template_context = get_base_template_context(site) + template_context.update({ + 'student_name': user.profile.name, - schedules = get_schedules_with_target_date_by_bin_and_orgs( - schedule_date_field='start', - current_datetime=current_datetime, - target_datetime=target_datetime, - bin_num=bin_num, - num_bins=RECURRING_NUDGE_NUM_BINS, - org_list=org_list, - exclude_orgs=exclude_orgs, - ) + 'course_name': first_schedule.enrollment.course.display_name, + 'course_url': absolute_url(site, reverse('course_root', args=[str(first_schedule.enrollment.course_id)])), - for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user): - user_schedules = list(user_schedules) - course_id_strs = [str(schedule.enrollment.course_id) - for schedule in user_schedules] + # This is used by the bulk email optout policy + 'course_ids': course_id_strs, + }) - first_schedule = user_schedules[0] - template_context = get_base_template_context(site) - template_context.update({ - 'student_name': user.profile.name, + # Information for including upsell messaging in template. + _add_upsell_button_information_to_template_context( + user, first_schedule, template_context) - 'course_name': first_schedule.enrollment.course.display_name, - 'course_url': absolute_url(site, reverse('course_root', args=[str(first_schedule.enrollment.course_id)])), - - # This is used by the bulk email optout policy - 'course_ids': course_id_strs, - }) - - # Information for including upsell messaging in template. - _add_upsell_button_information_to_template_context( - user, first_schedule, template_context) - - yield (user, first_schedule.enrollment.course.language, template_context) + yield (user, first_schedule.enrollment.course.language, template_context) def _get_datetime_beginning_of_day(dt): @@ -308,81 +241,78 @@ class UpgradeReminderResolver(BinnedSchedulesBaseResolver): """ Send a message to all users whose verified upgrade deadline is at ``self.current_date`` + ``day_offset``. """ - num_bins = UPGRADE_REMINDER_NUM_BINS - enqueue_config_var = 'enqueue_upgrade_reminder' - def __init__(self, *args, **kwargs): super(UpgradeReminderResolver, self).__init__(*args, **kwargs) self.log_prefix = 'Upgrade Reminder' - -def upgrade_reminder_schedule_bin( - async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, -): - target_datetime = deserialize(target_day_str) - # TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here - current_datetime = target_datetime - datetime.timedelta(days=day_offset) - msg_type = UpgradeReminder() - site = Site.objects.get(id=site_id) - - _annotate_for_monitoring(msg_type, site, bin_num,target_day_str, day_offset) - - for (user, language, context) in _upgrade_reminder_schedules_for_bin( - site, - current_datetime, - target_datetime, - bin_num, - org_list, - exclude_orgs + def upgrade_reminder_schedule_bin( + self, async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, ): - msg = msg_type.personalize( - Recipient( - user.username, - override_recipient_email or user.email, - ), - language, - context, + target_datetime = deserialize(target_day_str) + # TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here + current_datetime = target_datetime - datetime.timedelta(days=day_offset) + msg_type = UpgradeReminder() + site = Site.objects.get(id=site_id) + + _annotate_for_monitoring(msg_type, site, bin_num, + target_day_str, day_offset) + + for (user, language, context) in self._upgrade_reminder_schedules_for_bin( + site, + current_datetime, + target_datetime, + bin_num, + org_list, + exclude_orgs + ): + msg = msg_type.personalize( + Recipient( + user.username, + override_recipient_email or user.email, + ), + language, + context, + ) + with function_trace('enqueue_send_task'): + async_send_task.apply_async( + (site_id, str(msg)), retry=False) + + + def _upgrade_reminder_schedules_for_bin(self, site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False): + schedules = get_schedules_with_target_date_by_bin_and_orgs( + schedule_date_field='upgrade_deadline', + current_datetime=current_datetime, + target_datetime=target_datetime, + bin_num=bin_num, + num_bins=RECURRING_NUDGE_NUM_BINS, + org_list=org_list, + exclude_orgs=exclude_orgs, ) - with function_trace('enqueue_send_task'): - async_send_task.apply_async( - (site_id, str(msg)), retry=False) + for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user): + user_schedules = list(user_schedules) + course_id_strs = [str(schedule.enrollment.course_id) for schedule in user_schedules] -def _upgrade_reminder_schedules_for_bin(site, current_datetime, target_datetime, bin_num, org_list, exclude_orgs=False): - schedules = get_schedules_with_target_date_by_bin_and_orgs( - schedule_date_field='upgrade_deadline', - current_datetime=current_datetime, - target_datetime=target_datetime, - bin_num=bin_num, - num_bins=RECURRING_NUDGE_NUM_BINS, - org_list=org_list, - exclude_orgs=exclude_orgs, - ) + first_schedule = user_schedules[0] + template_context = get_base_template_context(site) + template_context.update({ + 'student_name': user.profile.name, + 'course_links': [ + { + 'url': absolute_url(site, reverse('course_root', args=[str(s.enrollment.course_id)])), + 'name': s.enrollment.course.display_name + } for s in user_schedules + ], + 'first_course_name': first_schedule.enrollment.course.display_name, + 'cert_image': absolute_url(site, static('course_experience/images/verified-cert.png')), - for (user, user_schedules) in groupby(schedules, lambda s: s.enrollment.user): - user_schedules = list(user_schedules) - course_id_strs = [str(schedule.enrollment.course_id) for schedule in user_schedules] + # This is used by the bulk email optout policy + 'course_ids': course_id_strs, + }) - first_schedule = user_schedules[0] - template_context = get_base_template_context(site) - template_context.update({ - 'student_name': user.profile.name, - 'course_links': [ - { - 'url': absolute_url(site, reverse('course_root', args=[str(s.enrollment.course_id)])), - 'name': s.enrollment.course.display_name - } for s in user_schedules - ], - 'first_course_name': first_schedule.enrollment.course.display_name, - 'cert_image': absolute_url(site, static('course_experience/images/verified-cert.png')), + _add_upsell_button_information_to_template_context(user, first_schedule, template_context) - # This is used by the bulk email optout policy - 'course_ids': course_id_strs, - }) - - _add_upsell_button_information_to_template_context(user, first_schedule, template_context) - - yield (user, first_schedule.enrollment.course.language, template_context) + yield (user, first_schedule.enrollment.course.language, template_context) def _add_upsell_button_information_to_template_context(user, schedule, template_context): @@ -424,87 +354,83 @@ class CourseUpdateResolver(BinnedSchedulesBaseResolver): Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset`` and the course has updates. """ - num_bins = COURSE_UPDATE_NUM_BINS - enqueue_config_var = 'enqueue_course_update' - def __init__(self, *args, **kwargs): super(CourseUpdateResolver, self).__init__(*args, **kwargs) self.log_prefix = 'Course Update' - -def course_update_schedule_bin( - async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, -): - target_datetime = deserialize(target_day_str) - # TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here - current_datetime = target_datetime - datetime.timedelta(days=day_offset) - msg_type = CourseUpdate() - site = Site.objects.get(id=site_id) - - _annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset) - - for (user, language, context) in _course_update_schedules_for_bin( - site, - current_datetime, - target_datetime, - day_offset, - bin_num, - org_list, - exclude_orgs + def course_update_schedule_bin( + self, async_send_task, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, ): - msg = msg_type.personalize( - Recipient( - user.username, - override_recipient_email or user.email, - ), - language, - context, + target_datetime = deserialize(target_day_str) + # TODO: in the next refactor of this task, pass in current_datetime instead of reproducing it here + current_datetime = target_datetime - datetime.timedelta(days=day_offset) + msg_type = CourseUpdate() + site = Site.objects.get(id=site_id) + + _annotate_for_monitoring(msg_type, site, bin_num, + target_day_str, day_offset) + + for (user, language, context) in self._course_update_schedules_for_bin( + site, + current_datetime, + target_datetime, + day_offset, + bin_num, + org_list, + exclude_orgs + ): + msg = msg_type.personalize( + Recipient( + user.username, + override_recipient_email or user.email, + ), + language, + context, + ) + with function_trace('enqueue_send_task'): + async_send_task.apply_async( + (site_id, str(msg)), retry=False) + + def _course_update_schedules_for_bin(self, site, current_datetime, target_datetime, day_offset, bin_num, org_list, + exclude_orgs=False): + week_num = abs(day_offset) / 7 + schedules = get_schedules_with_target_date_by_bin_and_orgs( + schedule_date_field='start', + current_datetime=current_datetime, + target_datetime=target_datetime, + bin_num=bin_num, + num_bins=COURSE_UPDATE_NUM_BINS, + org_list=org_list, + exclude_orgs=exclude_orgs, + order_by='enrollment__course', ) - with function_trace('enqueue_send_task'): - async_send_task.apply_async( - (site_id, str(msg)), retry=False) + LOG.debug('Course Update: Query = %r', schedules.query.sql_with_params()) + for schedule in schedules: + enrollment = schedule.enrollment + try: + week_summary = get_course_week_summary( + enrollment.course_id, week_num) + except CourseUpdateDoesNotExist: + continue + user = enrollment.user + course_id_str = str(enrollment.course_id) -def _course_update_schedules_for_bin(site, current_datetime, target_datetime, day_offset, bin_num, org_list, - exclude_orgs=False): - week_num = abs(day_offset) / 7 - schedules = get_schedules_with_target_date_by_bin_and_orgs( - schedule_date_field='start', - current_datetime=current_datetime, - target_datetime=target_datetime, - bin_num=bin_num, - num_bins=COURSE_UPDATE_NUM_BINS, - org_list=org_list, - exclude_orgs=exclude_orgs, - order_by='enrollment__course', - ) + template_context = get_base_template_context(site) + template_context.update({ + 'student_name': user.profile.name, + 'user_personal_address': user.profile.name if user.profile.name else user.username, + 'course_name': schedule.enrollment.course.display_name, + 'course_url': absolute_url(site, reverse('course_root', args=[str(schedule.enrollment.course_id)])), + 'week_num': week_num, + 'week_summary': week_summary, - for schedule in schedules: - enrollment = schedule.enrollment - try: - week_summary = get_course_week_summary( - enrollment.course_id, week_num) - except CourseUpdateDoesNotExist: - continue + # This is used by the bulk email optout policy + 'course_ids': [course_id_str], + }) - user = enrollment.user - course_id_str = str(enrollment.course_id) - - template_context = get_base_template_context(site) - template_context.update({ - 'student_name': user.profile.name, - 'user_personal_address': user.profile.name if user.profile.name else user.username, - 'course_name': schedule.enrollment.course.display_name, - 'course_url': absolute_url(site, reverse('course_root', args=[str(schedule.enrollment.course_id)])), - 'week_num': week_num, - 'week_summary': week_summary, - - # This is used by the bulk email optout policy - 'course_ids': [course_id_str], - }) - - yield (user, schedule.enrollment.course.language, template_context) + yield (user, schedule.enrollment.course.language, template_context) @request_cached diff --git a/openedx/core/djangoapps/schedules/tasks.py b/openedx/core/djangoapps/schedules/tasks.py index 0fd135aa24..f21b320d00 100644 --- a/openedx/core/djangoapps/schedules/tasks.py +++ b/openedx/core/djangoapps/schedules/tasks.py @@ -13,13 +13,14 @@ from django.utils.formats import dateformat, get_format from edx_ace import ace from edx_ace.message import Message from edx_ace.recipient import Recipient -from edx_ace.utils.date import deserialize +from edx_ace.utils.date import deserialize, serialize from opaque_keys.edx.keys import CourseKey from openedx.core.djangoapps.monitoring_utils import set_custom_metric from openedx.core.djangoapps.schedules.models import Schedule, ScheduleConfig from openedx.core.djangoapps.schedules import resolvers +from openedx.core.djangoapps.site_configuration.models import SiteConfiguration LOG = logging.getLogger(__name__) @@ -68,12 +69,91 @@ def _recurring_nudge_schedule_send(site_id, msg_str): class ScheduleMessageBaseTask(Task): ignore_result = True routing_key = ROUTING_KEY + num_bins = resolvers.DEFAULT_NUM_BINS + enqueue_config_var = None # define in subclass + log_prefix = None + + @classmethod + def log_debug(cls, message, *args, **kwargs): + LOG.debug(cls.log_prefix + ': ' + message, *args, **kwargs) + + @classmethod + def enqueue(cls, site, current_date, day_offset, override_recipient_email=None): + current_date = current_date.replace(hour=0, minute=0, second=0) + + if not cls.is_enqueue_enabled(site): + cls.log_debug( + 'Message queuing disabled for site %s', site.domain) + return + + exclude_orgs, org_list = cls.get_course_org_filter(site) + + target_date = current_date + datetime.timedelta(days=day_offset) + cls.log_debug('Target date = %s', target_date.isoformat()) + for bin in range(cls.num_bins): + task_args = ( + site.id, + serialize(target_date), + day_offset, + bin, + org_list, + exclude_orgs, + override_recipient_email, + ) + cls.log_debug('Launching task with args = %r', task_args) + cls.apply_async( + task_args, + retry=False, + ) + + @classmethod + def is_enqueue_enabled(cls, site): + if cls.enqueue_config_var: + return getattr(ScheduleConfig.current(site), cls.enqueue_config_var) + return False + + @classmethod + def get_course_org_filter(cls, site): + """ + Given the configuration of sites, get the list of orgs that should be included or excluded from this send. + + Returns: + tuple: Returns a tuple (exclude_orgs, org_list). If exclude_orgs is True, then org_list is a list of the + only orgs that should be included in this send. If exclude_orgs is False, then org_list is a list of + orgs that should be excluded from this send. All other orgs should be included. + """ + try: + site_config = SiteConfiguration.objects.get(site_id=site.id) + org_list = site_config.get_value('course_org_filter') + exclude_orgs = False + if not org_list: + not_orgs = set() + for other_site_config in SiteConfiguration.objects.all(): + other = other_site_config.get_value('course_org_filter') + if not isinstance(other, list): + if other is not None: + not_orgs.add(other) + else: + not_orgs.update(other) + org_list = list(not_orgs) + exclude_orgs = True + elif not isinstance(org_list, list): + org_list = [org_list] + except SiteConfiguration.DoesNotExist: + org_list = None + exclude_orgs = False + finally: + return exclude_orgs, org_list class ScheduleRecurringNudge(ScheduleMessageBaseTask): + num_bins = resolvers.RECURRING_NUDGE_NUM_BINS + enqueue_config_var = 'enqueue_recurring_nudge' + log_prefix = 'Scheduled Nudge' + def run( self, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, ): - return resolvers.recurring_nudge_schedule_bin( + return resolvers.ScheduleStartResolver().recurring_nudge_schedule_bin( _recurring_nudge_schedule_send, site_id, target_day_str, @@ -86,10 +166,15 @@ class ScheduleRecurringNudge(ScheduleMessageBaseTask): class ScheduleUpgradeReminder(ScheduleMessageBaseTask): + num_bins = resolvers.UPGRADE_REMINDER_NUM_BINS + enqueue_config_var = 'enqueue_upgrade_reminder' + log_prefix = 'Course Update' + + def run( self, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, ): - return resolvers.upgrade_reminder_schedule_bin( + return resolvers.UpgradeReminderResolver().upgrade_reminder_schedule_bin( _upgrade_reminder_schedule_send, site_id, target_day_str, @@ -111,10 +196,14 @@ def _upgrade_reminder_schedule_send(site_id, msg_str): class ScheduleCourseUpdate(ScheduleMessageBaseTask): + num_bins = resolvers.COURSE_UPDATE_NUM_BINS + enqueue_config_var = 'enqueue_course_update' + log_prefix = 'Course Update' + def run( self, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, ): - return resolvers.course_update_schedule_bin( + return resolvers.CourseUpdateResolver().course_update_schedule_bin( _course_update_schedule_send, site_id, target_day_str, diff --git a/openedx/core/djangoapps/schedules/tests/test_resolvers.py b/openedx/core/djangoapps/schedules/tests/test_resolvers.py deleted file mode 100644 index 3c65ea5dcf..0000000000 --- a/openedx/core/djangoapps/schedules/tests/test_resolvers.py +++ /dev/null @@ -1,110 +0,0 @@ -import datetime -from unittest import skipUnless - -import ddt -from django.conf import settings -from mock import patch - -from openedx.core.djangoapps.schedules.resolvers import BinnedSchedulesBaseResolver -from openedx.core.djangoapps.schedules.resolvers import DEFAULT_NUM_BINS -from openedx.core.djangoapps.schedules.tests.factories import ScheduleConfigFactory -from openedx.core.djangoapps.site_configuration.tests.factories import SiteConfigurationFactory, SiteFactory -from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_unless_lms - - -@ddt.ddt -@skip_unless_lms -@skipUnless('openedx.core.djangoapps.schedules.apps.SchedulesConfig' in settings.INSTALLED_APPS, - "Can't test schedules if the app isn't installed") -class TestBinnedSchedulesBaseResolver(CacheIsolationTestCase): - def setUp(self): - super(TestBinnedSchedulesBaseResolver, self).setUp() - - self.site = SiteFactory.create() - self.site_config = SiteConfigurationFactory.create(site=self.site) - self.schedule_config = ScheduleConfigFactory.create(site=self.site) - - def setup_resolver(self, site=None, current_date=None, async_send_task=None): - if site is None: - site = self.site - if current_date is None: - current_date = datetime.datetime.now() - resolver = BinnedSchedulesBaseResolver(self.site, current_date, async_send_task) - return resolver - - def test_init_site(self): - resolver = self.setup_resolver() - assert resolver.site == self.site - - def test_init_current_date(self): - current_time = datetime.datetime.now() - resolver = self.setup_resolver(current_date=current_time) - current_date = current_time.replace(hour=0, minute=0, second=0) - assert resolver.current_date == current_date - - def test_init_async_send_task(self): - resolver = self.setup_resolver() - assert resolver.async_send_task is None - - def test_init_num_bins(self): - resolver = self.setup_resolver() - assert resolver.num_bins == DEFAULT_NUM_BINS - - def test_send_enqueue_disabled(self): - resolver = self.setup_resolver() - resolver.is_enqueue_enabled = lambda: False - with patch.object(resolver, 'async_send_task') as send: - with patch.object(resolver, 'log_debug') as log_debug: - resolver.send(day_offset=2) - log_debug.assert_called_once_with('Message queuing disabled for site %s', self.site.domain) - send.apply_async.assert_not_called() - - @ddt.data(0, 2, -3) - def test_send_enqueue_enabled(self, day_offset): - resolver = self.setup_resolver() - resolver.is_enqueue_enabled = lambda: True - resolver.get_course_org_filter = lambda: (False, None) - with patch.object(resolver, 'async_send_task') as send: - with patch.object(resolver, 'log_debug') as log_debug: - resolver.send(day_offset=day_offset) - target_date = resolver.current_date + datetime.timedelta(day_offset) - log_debug.assert_any_call('Target date = %s', target_date.isoformat()) - assert send.apply_async.call_count == DEFAULT_NUM_BINS - - @ddt.data(True, False) - def test_is_enqueue_enabled(self, enabled): - resolver = self.setup_resolver() - resolver.enqueue_config_var = 'enqueue_recurring_nudge' - self.schedule_config.enqueue_recurring_nudge = enabled - self.schedule_config.save() - assert resolver.is_enqueue_enabled() == enabled - - @ddt.unpack - @ddt.data( - ('course1', ['course1']), - (['course1', 'course2'], ['course1', 'course2']) - ) - def test_get_course_org_filter_include(self, course_org_filter, expected_org_list): - resolver = self.setup_resolver() - self.site_config.values['course_org_filter'] = course_org_filter - self.site_config.save() - exclude_orgs, org_list = resolver.get_course_org_filter() - assert not exclude_orgs - assert org_list == expected_org_list - - @ddt.unpack - @ddt.data( - (None, []), - ('course1', [u'course1']), - (['course1', 'course2'], [u'course1', u'course2']) - ) - def test_get_course_org_filter_exclude(self, course_org_filter, expected_org_list): - resolver = self.setup_resolver() - self.other_site = SiteFactory.create() - self.other_site_config = SiteConfigurationFactory.create( - site=self.other_site, - values={'course_org_filter': course_org_filter}, - ) - exclude_orgs, org_list = resolver.get_course_org_filter() - assert exclude_orgs - self.assertItemsEqual(org_list, expected_org_list) diff --git a/openedx/core/djangoapps/schedules/tests/test_tasks.py b/openedx/core/djangoapps/schedules/tests/test_tasks.py new file mode 100644 index 0000000000..63517a4d01 --- /dev/null +++ b/openedx/core/djangoapps/schedules/tests/test_tasks.py @@ -0,0 +1,101 @@ +import datetime +from unittest import skipUnless + +import ddt +from django.conf import settings +from mock import patch, DEFAULT, Mock + +from openedx.core.djangoapps.schedules.tasks import ScheduleMessageBaseTask +from openedx.core.djangoapps.schedules.resolvers import DEFAULT_NUM_BINS +from openedx.core.djangoapps.schedules.tests.factories import ScheduleConfigFactory +from openedx.core.djangoapps.site_configuration.tests.factories import SiteConfigurationFactory, SiteFactory +from openedx.core.djangolib.testing.utils import CacheIsolationTestCase, skip_unless_lms + + +@ddt.ddt +@skip_unless_lms +@skipUnless('openedx.core.djangoapps.schedules.apps.SchedulesConfig' in settings.INSTALLED_APPS, + "Can't test schedules if the app isn't installed") +class TestScheduleMessageBaseTask(CacheIsolationTestCase): + def setUp(self): + super(TestScheduleMessageBaseTask, self).setUp() + + self.site = SiteFactory.create() + self.site_config = SiteConfigurationFactory.create(site=self.site) + self.schedule_config = ScheduleConfigFactory.create(site=self.site) + self.basetask = ScheduleMessageBaseTask + + def test_send_enqueue_disabled(self): + send = Mock(name='async_send_task') + with patch.multiple( + self.basetask, + is_enqueue_enabled=Mock(return_value=False), + log_debug=DEFAULT, + run=send, + ) as patches: + self.basetask.enqueue( + site=self.site, + current_date=datetime.datetime.now(), + day_offset=2 + ) + patches['log_debug'].assert_called_once_with( + 'Message queuing disabled for site %s', self.site.domain) + send.apply_async.assert_not_called() + + @ddt.data(0, 2, -3) + def test_send_enqueue_enabled(self, day_offset): + send = Mock(name='async_send_task') + current_date = datetime.datetime.now() + with patch.multiple( + self.basetask, + is_enqueue_enabled=Mock(return_value=True), + get_course_org_filter=Mock(return_value=(False, None)), + log_debug=DEFAULT, + run=send, + ) as patches: + self.basetask.enqueue( + site=self.site, + current_date=current_date, + day_offset=day_offset + ) + target_date = current_date.replace(hour=0, minute=0, second=0) + \ + datetime.timedelta(day_offset) + print(patches['log_debug'].mock_calls) + patches['log_debug'].assert_any_call( + 'Target date = %s', target_date.isoformat()) + assert send.call_count == DEFAULT_NUM_BINS + + @ddt.data(True, False) + def test_is_enqueue_enabled(self, enabled): + with patch.object(self.basetask, 'enqueue_config_var', 'enqueue_recurring_nudge'): + self.schedule_config.enqueue_recurring_nudge = enabled + self.schedule_config.save() + assert self.basetask.is_enqueue_enabled(self.site) == enabled + + @ddt.unpack + @ddt.data( + ('course1', ['course1']), + (['course1', 'course2'], ['course1', 'course2']) + ) + def test_get_course_org_filter_include(self, course_org_filter, expected_org_list): + self.site_config.values['course_org_filter'] = course_org_filter + self.site_config.save() + exclude_orgs, org_list = self.basetask.get_course_org_filter(self.site) + assert not exclude_orgs + assert org_list == expected_org_list + + @ddt.unpack + @ddt.data( + (None, []), + ('course1', [u'course1']), + (['course1', 'course2'], [u'course1', u'course2']) + ) + def test_get_course_org_filter_exclude(self, course_org_filter, expected_org_list): + self.other_site = SiteFactory.create() + self.other_site_config = SiteConfigurationFactory.create( + site=self.other_site, + values={'course_org_filter': course_org_filter}, + ) + exclude_orgs, org_list = self.basetask.get_course_org_filter(self.site) + assert exclude_orgs + self.assertItemsEqual(org_list, expected_org_list)