From 1accff9b787f1f5e45bb426003ca048122a9db26 Mon Sep 17 00:00:00 2001 From: Calen Pennington Date: Mon, 23 Oct 2017 23:51:41 -0400 Subject: [PATCH] DRY up more of tasks.py code --- .../core/djangoapps/schedules/resolvers.py | 38 +----- openedx/core/djangoapps/schedules/tasks.py | 125 +++++++++++------- 2 files changed, 83 insertions(+), 80 deletions(-) diff --git a/openedx/core/djangoapps/schedules/resolvers.py b/openedx/core/djangoapps/schedules/resolvers.py index dcee44dd80..b424788d3b 100644 --- a/openedx/core/djangoapps/schedules/resolvers.py +++ b/openedx/core/djangoapps/schedules/resolvers.py @@ -13,7 +13,6 @@ from django.utils.formats import dateformat, get_format from edx_ace.recipient_resolver import RecipientResolver from edx_ace.recipient import Recipient -from edx_ace.utils.date import serialize from courseware.date_summary import verified_upgrade_deadline_link, verified_upgrade_link_is_valid from openedx.core.djangoapps.monitoring_utils import function_trace, set_custom_metric @@ -69,8 +68,6 @@ class BinnedSchedulesBaseResolver(PrefixedDebugLoggerMixin, RecipientResolver): self.current_datetime = self.target_datetime - datetime.timedelta(days=self.day_offset) def send(self, msg_type): - _annotate_for_monitoring(msg_type, self.site, self.bin_num, self.target_datetime, self.day_offset) - for (user, language, context) in self.schedules_for_bin(): msg = msg_type.personalize( Recipient( @@ -152,23 +149,6 @@ class BinnedSchedulesBaseResolver(PrefixedDebugLoggerMixin, RecipientResolver): return schedules -def _annotate_for_monitoring(message_type, site, bin_num, target_datetime, day_offset): - # This identifies the type of message being sent, for example: schedules.recurring_nudge3. - set_custom_metric('message_name', '{0}.{1}'.format( - message_type.app_label, message_type.name)) - # The domain name of the site we are sending the message for. - set_custom_metric('site', site.domain) - # This is the "bin" of data being processed. We divide up the work into chunks so that we don't tie up celery - # workers for too long. This could help us identify particular bins that are problematic. - set_custom_metric('bin', bin_num) - # The date we are processing data for. - set_custom_metric('target_day', serialize(target_datetime)) - # The number of days relative to the current date to process data for. - set_custom_metric('day_offset', day_offset) - # A unique identifier for this batch of messages being sent. - set_custom_metric('send_uuid', message_type.uuid) - - class ScheduleStartResolver(BinnedSchedulesBaseResolver): """ Send a message to all users whose schedule started at ``self.current_date`` + ``day_offset``. @@ -251,8 +231,7 @@ def _add_upsell_button_information_to_template_context(user, schedule, template_ enrollment = schedule.enrollment course = enrollment.course - verified_upgrade_link = _get_link_to_purchase_verified_certificate( - user, schedule) + verified_upgrade_link = _get_verified_upgrade_link(user, schedule) has_verified_upgrade_link = verified_upgrade_link is not None if has_verified_upgrade_link: @@ -269,12 +248,10 @@ def _add_upsell_button_information_to_template_context(user, schedule, template_ template_context['show_upsell'] = has_verified_upgrade_link -def _get_link_to_purchase_verified_certificate(a_user, a_schedule): - enrollment = a_schedule.enrollment - if enrollment.dynamic_upgrade_deadline is None or not verified_upgrade_link_is_valid(enrollment): - return None - - return verified_upgrade_deadline_link(a_user, enrollment.course) +def _get_verified_upgrade_link(user, schedule): + enrollment = schedule.enrollment + if enrollment.dynamic_upgrade_deadline is not None and verified_upgrade_link_is_valid(enrollment): + return verified_upgrade_deadline_link(user, enrollment.course) class CourseUpdateResolver(BinnedSchedulesBaseResolver): @@ -291,13 +268,11 @@ class CourseUpdateResolver(BinnedSchedulesBaseResolver): schedules = self.get_schedules_with_target_date_by_bin_and_orgs( order_by='enrollment__course', ) - LOG.debug('Course Update: Query = %r', schedules.query.sql_with_params()) for schedule in schedules: enrollment = schedule.enrollment try: - week_summary = get_course_week_summary( - enrollment.course_id, week_num) + week_summary = get_course_week_summary(enrollment.course_id, week_num) except CourseUpdateDoesNotExist: continue @@ -307,7 +282,6 @@ class CourseUpdateResolver(BinnedSchedulesBaseResolver): template_context = get_base_template_context(self.site) template_context.update({ 'student_name': user.profile.name, - 'user_personal_address': user.profile.name if user.profile.name else user.username, 'course_name': schedule.enrollment.course.display_name, 'course_url': absolute_url(self.site, reverse('course_root', args=[str(schedule.enrollment.course_id)])), 'week_num': week_num, diff --git a/openedx/core/djangoapps/schedules/tasks.py b/openedx/core/djangoapps/schedules/tasks.py index d76775c197..96c391f6c5 100644 --- a/openedx/core/djangoapps/schedules/tasks.py +++ b/openedx/core/djangoapps/schedules/tasks.py @@ -30,6 +30,11 @@ KNOWN_RETRY_ERRORS = ( # Errors we expect occasionally that could resolve on re ) +RECURRING_NUDGE_LOG_PREFIX = 'Recurring Nudge' +UPGRADE_REMINDER_LOG_PREFIX = 'Upgrade Reminder' +COURSE_UPDATE_LOG_PREFIX = 'Course Update' + + @task(bind=True, default_retry_delay=30, routing_key=ROUTING_KEY) def update_course_schedules(self, **kwargs): course_key = CourseKey.from_string(kwargs['course_id']) @@ -65,8 +70,7 @@ class ScheduleMessageBaseTask(Task): current_date = resolvers._get_datetime_beginning_of_day(current_date) if not cls.is_enqueue_enabled(site): - cls.log_debug( - 'Message queuing disabled for site %s', site.domain) + cls.log_debug('Message queuing disabled for site %s', site.domain) return exclude_orgs, org_list = cls.get_course_org_filter(site) @@ -132,9 +136,11 @@ class ScheduleMessageBaseTask(Task): self, site_id, target_day_str, day_offset, bin_num, org_list, exclude_orgs=False, override_recipient_email=None, ): msg_type = self.make_message_type(day_offset) + site = Site.objects.get(id=site_id) + _annotate_for_monitoring(msg_type, site, bin_num, target_day_str, day_offset) return self.resolver( self.async_send_task, - Site.objects.get(id=site_id), + site, deserialize(target_day_str), day_offset, bin_num, @@ -144,30 +150,43 @@ class ScheduleMessageBaseTask(Task): ).send(msg_type) def make_message_type(self, day_offset): - raise NotImplementedError() + raise NotImplementedError @task(ignore_result=True, routing_key=ROUTING_KEY) def _recurring_nudge_schedule_send(site_id, msg_str): - site = Site.objects.get(pk=site_id) - if not ScheduleConfig.current(site).deliver_recurring_nudge: - LOG.debug( - 'Recurring Nudge: Message delivery disabled for site %s', site.domain) - return + _schedule_send( + msg_str, + site_id, + 'deliver_recurring_nudge', + RECURRING_NUDGE_LOG_PREFIX, + ) - msg = Message.from_string(msg_str) - # A unique identifier for this batch of messages being sent. - set_custom_metric('send_uuid', msg.send_uuid) - # A unique identifier for this particular message. - set_custom_metric('uuid', msg.uuid) - LOG.debug('Recurring Nudge: Sending message = %s', msg_str) - ace.send(msg) + +@task(ignore_result=True, routing_key=ROUTING_KEY) +def _upgrade_reminder_schedule_send(site_id, msg_str): + _schedule_send( + msg_str, + site_id, + 'deliver_upgrade_reminder', + UPGRADE_REMINDER_LOG_PREFIX, + ) + + +@task(ignore_result=True, routing_key=ROUTING_KEY) +def _course_update_schedule_send(site_id, msg_str): + _schedule_send( + msg_str, + site_id, + 'deliver_course_update', + COURSE_UPDATE_LOG_PREFIX, + ) class ScheduleRecurringNudge(ScheduleMessageBaseTask): num_bins = resolvers.RECURRING_NUDGE_NUM_BINS enqueue_config_var = 'enqueue_recurring_nudge' - log_prefix = 'Scheduled Nudge' + log_prefix = RECURRING_NUDGE_LOG_PREFIX resolver = resolvers.ScheduleStartResolver async_send_task = _recurring_nudge_schedule_send @@ -175,24 +194,10 @@ class ScheduleRecurringNudge(ScheduleMessageBaseTask): return message_types.RecurringNudge(abs(day_offset)) -@task(ignore_result=True, routing_key=ROUTING_KEY) -def _upgrade_reminder_schedule_send(site_id, msg_str): - site = Site.objects.get(pk=site_id) - if not ScheduleConfig.current(site).deliver_upgrade_reminder: - return - - msg = Message.from_string(msg_str) - # A unique identifier for this batch of messages being sent. - set_custom_metric('send_uuid', msg.send_uuid) - # A unique identifier for this particular message. - set_custom_metric('uuid', msg.uuid) - ace.send(msg) - - class ScheduleUpgradeReminder(ScheduleMessageBaseTask): num_bins = resolvers.UPGRADE_REMINDER_NUM_BINS enqueue_config_var = 'enqueue_upgrade_reminder' - log_prefix = 'Course Update' + log_prefix = UPGRADE_REMINDER_LOG_PREFIX resolver = resolvers.UpgradeReminderResolver async_send_task = _upgrade_reminder_schedule_send @@ -200,27 +205,51 @@ class ScheduleUpgradeReminder(ScheduleMessageBaseTask): return message_types.UpgradeReminder() - -@task(ignore_result=True, routing_key=ROUTING_KEY) -def _course_update_schedule_send(site_id, msg_str): - site = Site.objects.get(pk=site_id) - if not ScheduleConfig.current(site).deliver_course_update: - return - - msg = Message.from_string(msg_str) - # A unique identifier for this batch of messages being sent. - set_custom_metric('send_uuid', msg.send_uuid) - # A unique identifier for this particular message. - set_custom_metric('uuid', msg.uuid) - ace.send(msg) - - class ScheduleCourseUpdate(ScheduleMessageBaseTask): num_bins = resolvers.COURSE_UPDATE_NUM_BINS enqueue_config_var = 'enqueue_course_update' - log_prefix = 'Course Update' + log_prefix = COURSE_UPDATE_LOG_PREFIX resolver = resolvers.CourseUpdateResolver async_send_task = _course_update_schedule_send def make_message_type(self, day_offset): return message_types.CourseUpdate() + + +def _schedule_send(msg_str, site_id, delivery_config_var, log_prefix): + if _is_delivery_enabled(site_id, delivery_config_var, log_prefix): + msg = Message.from_string(msg_str) + _annonate_send_task_for_monitoring(msg) + LOG.debug('%s: Sending message = %s', log_prefix, msg_str) + ace.send(msg) + + +def _is_delivery_enabled(site_id, delivery_config_var, log_prefix): + site = Site.objects.get(pk=site_id) + if getattr(ScheduleConfig.current(site), delivery_config_var, False): + return True + else: + LOG.debug('%s: Message delivery disabled for site %s', log_prefix, site.domain) + + +def _annotate_for_monitoring(message_type, site, bin_num, target_day_str, day_offset): + # This identifies the type of message being sent, for example: schedules.recurring_nudge3. + set_custom_metric('message_name', '{0}.{1}'.format(message_type.app_label, message_type.name)) + # The domain name of the site we are sending the message for. + set_custom_metric('site', site.domain) + # This is the "bin" of data being processed. We divide up the work into chunks so that we don't tie up celery + # workers for too long. This could help us identify particular bins that are problematic. + set_custom_metric('bin', bin_num) + # The date we are processing data for. + set_custom_metric('target_day', target_day_str) + # The number of days relative to the current date to process data for. + set_custom_metric('day_offset', day_offset) + # A unique identifier for this batch of messages being sent. + set_custom_metric('send_uuid', message_type.uuid) + + +def _annonate_send_task_for_monitoring(msg): + # A unique identifier for this batch of messages being sent. + set_custom_metric('send_uuid', msg.send_uuid) + # A unique identifier for this particular message. + set_custom_metric('uuid', msg.uuid)