Ran pyupgrade on lms/djangoapps (#26519)

* refactor: ran pyupgrade on lms/djangoapps/bulk_enroll
Co-authored-by: M. Zulqarnain <muhammad.zulqarnain@arbisoft.com>
This commit is contained in:
Usama Sadiq
2021-03-01 17:13:18 +05:00
committed by GitHub
parent e5b06ef959
commit 7f6cabbf98
23 changed files with 253 additions and 316 deletions

View File

@@ -7,7 +7,13 @@ from config_models.admin import ConfigurationModelAdmin
from django.contrib import admin
from lms.djangoapps.bulk_email.forms import CourseAuthorizationAdminForm, CourseEmailTemplateForm
from lms.djangoapps.bulk_email.models import BulkEmailFlag, CourseAuthorization, CourseEmail, CourseEmailTemplate, Optout # lint-amnesty, pylint: disable=line-too-long
from lms.djangoapps.bulk_email.models import (
BulkEmailFlag,
CourseAuthorization,
CourseEmail,
CourseEmailTemplate,
Optout
)
class CourseEmailAdmin(admin.ModelAdmin):
@@ -27,7 +33,7 @@ class CourseEmailTemplateAdmin(admin.ModelAdmin):
(None, {
# make the HTML template display above the plain template:
'fields': ('html_template', 'plain_template', 'name'),
'description': u'''
'description': '''
Enter template to be used by course staff when sending emails to enrolled students.
The HTML template is for HTML email, and may contain HTML markup. The plain template is

View File

@@ -45,5 +45,5 @@ def get_unsubscribed_link(username, course_id):
lms_root_url = configuration_helpers.get_value('LMS_ROOT_URL', settings.LMS_ROOT_URL)
token = UsernameCipher.encrypt(username)
optout_url = reverse('bulk_email_opt_out', kwargs={'token': token, 'course_id': course_id})
url = '{base_url}{optout_url}'.format(base_url=lms_root_url, optout_url=optout_url)
url = f'{lms_root_url}{optout_url}'
return url

View File

@@ -6,4 +6,4 @@ class BulkEmailConfig(AppConfig):
"""
Application Configuration for bulk_email.
"""
name = u'lms.djangoapps.bulk_email'
name = 'lms.djangoapps.bulk_email'

View File

@@ -19,7 +19,7 @@ class CourseEmailTemplateForm(forms.ModelForm):
name = forms.CharField(required=False)
class Meta(object):
class Meta:
model = CourseEmailTemplate
fields = ('html_template', 'plain_template', 'name')
@@ -27,11 +27,11 @@ class CourseEmailTemplateForm(forms.ModelForm):
"""Check the template for required tags."""
index = template.find(COURSE_EMAIL_MESSAGE_BODY_TAG)
if index < 0:
msg = u'Missing tag: "{}"'.format(COURSE_EMAIL_MESSAGE_BODY_TAG)
msg = f'Missing tag: "{COURSE_EMAIL_MESSAGE_BODY_TAG}"'
log.warning(msg)
raise ValidationError(msg)
if template.find(COURSE_EMAIL_MESSAGE_BODY_TAG, index + 1) >= 0:
msg = u'Multiple instances of tag: "{}"'.format(COURSE_EMAIL_MESSAGE_BODY_TAG)
msg = f'Multiple instances of tag: "{COURSE_EMAIL_MESSAGE_BODY_TAG}"'
log.warning(msg)
raise ValidationError(msg)
# TODO: add more validation here, including the set of known tags
@@ -63,7 +63,7 @@ class CourseEmailTemplateForm(forms.ModelForm):
try:
CourseEmailTemplate.get_template(name)
# already exists, this is no good
raise ValidationError(u'Name of "{}" already exists, this must be unique.'.format(name))
raise ValidationError(f'Name of "{name}" already exists, this must be unique.')
except CourseEmailTemplate.DoesNotExist:
# this is actually the successful validation
pass
@@ -73,7 +73,7 @@ class CourseEmailTemplateForm(forms.ModelForm):
class CourseAuthorizationAdminForm(forms.ModelForm):
"""Input form for email enabling, allowing us to verify data."""
class Meta(object):
class Meta:
model = CourseAuthorization
fields = '__all__'

View File

@@ -1,6 +1,3 @@
# -*- coding: utf-8 -*-
from django.conf import settings
from django.db import migrations, models
from opaque_keys.edx.django.models import CourseKeyField
@@ -32,7 +29,7 @@ class Migration(migrations.Migration):
('created', models.DateTimeField(auto_now_add=True)),
('modified', models.DateTimeField(auto_now=True)),
('course_id', CourseKeyField(max_length=255, db_index=True)),
('to_option', models.CharField(default=u'myself', max_length=64, choices=[(u'myself', u'Myself'), (u'staff', u'Staff and instructors'), (u'all', u'All')])),
('to_option', models.CharField(default='myself', max_length=64, choices=[('myself', 'Myself'), ('staff', 'Staff and instructors'), ('all', 'All')])),
('template_name', models.CharField(max_length=255, null=True)),
('from_addr', models.CharField(max_length=255, null=True)),
('sender', models.ForeignKey(default=1, blank=True, to=settings.AUTH_USER_MODEL, null=True, on_delete=models.CASCADE)),
@@ -57,6 +54,6 @@ class Migration(migrations.Migration):
),
migrations.AlterUniqueTogether(
name='optout',
unique_together=set([('user', 'course_id')]),
unique_together={('user', 'course_id')},
),
]

View File

@@ -1,6 +1,3 @@
# -*- coding: utf-8 -*-
from django.core.management import call_command
from django.db import migrations, models

View File

@@ -1,6 +1,3 @@
# -*- coding: utf-8 -*-
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models

View File

@@ -1,6 +1,3 @@
# -*- coding: utf-8 -*-
from django.db import migrations, models
@@ -16,13 +13,13 @@ class Migration(migrations.Migration):
name='Target',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('target_type', models.CharField(max_length=64, choices=[(u'myself', u'Myself'), (u'staff', u'Staff and instructors'), (u'learners', u'All students'), (u'cohort', u'Specific cohort')])),
('target_type', models.CharField(max_length=64, choices=[('myself', 'Myself'), ('staff', 'Staff and instructors'), ('learners', 'All students'), ('cohort', 'Specific cohort')])),
],
),
migrations.AlterField(
model_name='courseemail',
name='to_option',
field=models.CharField(max_length=64, choices=[(u'deprecated', u'deprecated')]),
field=models.CharField(max_length=64, choices=[('deprecated', 'deprecated')]),
),
migrations.CreateModel(
name='CohortTarget',

View File

@@ -1,6 +1,3 @@
# -*- coding: utf-
from django.db import migrations, models
from django.db.utils import DatabaseError

View File

@@ -1,6 +1,3 @@
# -*- coding: utf-8 -*-
from django.db import migrations, models
@@ -23,6 +20,6 @@ class Migration(migrations.Migration):
migrations.AlterField(
model_name='target',
name='target_type',
field=models.CharField(max_length=64, choices=[(u'myself', u'Myself'), (u'staff', u'Staff and instructors'), (u'learners', u'All students'), (u'cohort', u'Specific cohort'), (u'track', u'Specific course mode')]),
field=models.CharField(max_length=64, choices=[('myself', 'Myself'), ('staff', 'Staff and instructors'), ('learners', 'All students'), ('cohort', 'Specific cohort'), ('track', 'Specific course mode')]),
),
]

View File

@@ -6,25 +6,22 @@ Models for bulk email
import logging
import markupsafe
import six
from config_models.models import ConfigurationModel # lint-amnesty, pylint: disable=import-error
from config_models.models import ConfigurationModel
from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user
from django.db import models
from django.utils.encoding import python_2_unicode_compatible
from opaque_keys.edx.django.models import CourseKeyField # lint-amnesty, pylint: disable=import-error
from six import text_type
from six.moves import zip
from opaque_keys.edx.django.models import CourseKeyField
from common.djangoapps.course_modes.models import CourseMode
from common.djangoapps.student.roles import CourseInstructorRole, CourseStaffRole
from common.djangoapps.util.keyword_substitution import substitute_keywords_with_data
from common.djangoapps.util.query import use_read_replica_if_available
from openedx.core.djangoapps.course_groups.cohorts import get_cohort_by_name
from openedx.core.djangoapps.course_groups.models import CourseUserGroup
from openedx.core.djangoapps.enrollments.api import validate_course_mode
from openedx.core.djangoapps.enrollments.errors import CourseModeNotFoundError
from openedx.core.lib.html_to_text import html_to_text
from openedx.core.lib.mail_utils import wrap_message
from common.djangoapps.student.roles import CourseInstructorRole, CourseStaffRole
from common.djangoapps.util.keyword_substitution import substitute_keywords_with_data
from common.djangoapps.util.query import use_read_replica_if_available
log = logging.getLogger(__name__)
@@ -43,20 +40,20 @@ class Email(models.Model):
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
class Meta(object):
class Meta:
app_label = "bulk_email"
abstract = True
# Bulk email targets - the send to options that users can select from when they send email.
SEND_TO_MYSELF = u'myself'
SEND_TO_STAFF = u'staff'
SEND_TO_LEARNERS = u'learners'
SEND_TO_COHORT = u'cohort'
SEND_TO_TRACK = u'track'
SEND_TO_MYSELF = 'myself'
SEND_TO_STAFF = 'staff'
SEND_TO_LEARNERS = 'learners'
SEND_TO_COHORT = 'cohort'
SEND_TO_TRACK = 'track'
EMAIL_TARGET_CHOICES = list(zip(
[SEND_TO_MYSELF, SEND_TO_STAFF, SEND_TO_LEARNERS, SEND_TO_COHORT, SEND_TO_TRACK],
[u'Myself', u'Staff and instructors', u'All students', u'Specific cohort', u'Specific course mode']
['Myself', 'Staff and instructors', 'All students', 'Specific cohort', 'Specific course mode']
))
EMAIL_TARGETS = {target[0] for target in EMAIL_TARGET_CHOICES}
@@ -78,11 +75,11 @@ class Target(models.Model):
"""
target_type = models.CharField(max_length=64, choices=EMAIL_TARGET_CHOICES)
class Meta(object):
class Meta:
app_label = "bulk_email"
def __str__(self):
return "CourseEmail Target: {}".format(self.short_display())
return f"CourseEmail Target: {self.short_display()}"
def short_display(self):
"""
@@ -142,7 +139,7 @@ class Target(models.Model):
)
)
else:
raise ValueError(u"Unrecognized target type {}".format(self.target_type))
raise ValueError(f"Unrecognized target type {self.target_type}")
@python_2_unicode_compatible
@@ -159,16 +156,16 @@ class CohortTarget(Target):
def __init__(self, *args, **kwargs):
kwargs['target_type'] = SEND_TO_COHORT
super(CohortTarget, self).__init__(*args, **kwargs) # lint-amnesty, pylint: disable=super-with-arguments
super().__init__(*args, **kwargs)
def __str__(self):
return self.short_display()
def short_display(self):
return "{}-{}".format(self.target_type, self.cohort.name)
return f"{self.target_type}-{self.cohort.name}"
def long_display(self):
return "Cohort: {}".format(self.cohort.name)
return f"Cohort: {self.cohort.name}"
@classmethod
def ensure_valid_cohort(cls, cohort_name, course_id):
@@ -183,7 +180,7 @@ class CohortTarget(Target):
cohort = get_cohort_by_name(name=cohort_name, course_key=course_id)
except CourseUserGroup.DoesNotExist:
raise ValueError( # lint-amnesty, pylint: disable=raise-missing-from
u"Cohort {cohort} does not exist in course {course_id}".format(
"Cohort {cohort} does not exist in course {course_id}".format(
cohort=cohort_name,
course_id=course_id
).encode('utf8')
@@ -205,20 +202,20 @@ class CourseModeTarget(Target):
def __init__(self, *args, **kwargs):
kwargs['target_type'] = SEND_TO_TRACK
super(CourseModeTarget, self).__init__(*args, **kwargs) # lint-amnesty, pylint: disable=super-with-arguments
super().__init__(*args, **kwargs)
def __str__(self):
return self.short_display()
def short_display(self):
return "{}-{}".format(self.target_type, self.track.mode_slug) # pylint: disable=no-member
return f"{self.target_type}-{self.track.mode_slug}" # pylint: disable=no-member
def long_display(self):
course_mode = self.track
long_course_mode_display = u'Course mode: {}'.format(course_mode.mode_display_name)
long_course_mode_display = f'Course mode: {course_mode.mode_display_name}'
if course_mode.mode_slug not in CourseMode.AUDIT_MODES:
mode_currency = u'Currency: {}'.format(course_mode.currency)
long_course_mode_display = u'{}, {}'.format(long_course_mode_display, mode_currency)
mode_currency = f'Currency: {course_mode.currency}'
long_course_mode_display = f'{long_course_mode_display}, {mode_currency}'
return long_course_mode_display
@classmethod
@@ -229,10 +226,10 @@ class CourseModeTarget(Target):
if mode_slug is None:
raise ValueError("Cannot create a CourseModeTarget without specifying a mode_slug.")
try:
validate_course_mode(six.text_type(course_id), mode_slug, include_expired=True)
validate_course_mode(str(course_id), mode_slug, include_expired=True)
except CourseModeNotFoundError:
raise ValueError( # lint-amnesty, pylint: disable=raise-missing-from
u"Track {track} does not exist in course {course_id}".format(
"Track {track} does not exist in course {course_id}".format(
track=mode_slug,
course_id=course_id
).encode('utf8')
@@ -246,12 +243,12 @@ class CourseEmail(Email):
.. no_pii:
"""
class Meta(object):
class Meta:
app_label = "bulk_email"
course_id = CourseKeyField(max_length=255, db_index=True)
# to_option is deprecated and unused, but dropping db columns is hard so it's still here for legacy reasons
to_option = models.CharField(max_length=64, choices=[(u"deprecated", u"deprecated")])
to_option = models.CharField(max_length=64, choices=[("deprecated", "deprecated")])
targets = models.ManyToManyField(Target)
template_name = models.CharField(null=True, max_length=255)
from_addr = models.CharField(null=True, max_length=255)
@@ -276,7 +273,7 @@ class CourseEmail(Email):
target_split = target.split(':', 1)
# Ensure our desired target exists
if target_split[0] not in EMAIL_TARGETS: # lint-amnesty, pylint: disable=no-else-raise
fmt = u'Course email being sent to unrecognized target: "{target}" for "{course}", subject "{subject}"'
fmt = 'Course email being sent to unrecognized target: "{target}" for "{course}", subject "{subject}"'
msg = fmt.format(target=target, course=course_id, subject=subject).encode('utf8')
raise ValueError(msg)
elif target_split[0] == SEND_TO_COHORT:
@@ -330,7 +327,7 @@ class Optout(models.Model):
user = models.ForeignKey(User, db_index=True, null=True, on_delete=models.CASCADE)
course_id = CourseKeyField(max_length=255, db_index=True)
class Meta(object):
class Meta:
app_label = "bulk_email"
unique_together = ('user', 'course_id')
@@ -358,7 +355,7 @@ class CourseEmailTemplate(models.Model):
.. no_pii:
"""
class Meta(object):
class Meta:
app_label = "bulk_email"
html_template = models.TextField(null=True, blank=True)
@@ -428,8 +425,8 @@ class CourseEmailTemplate(models.Model):
stored HTML template and the provided `context` dict.
"""
# HTML-escape string values in the context (used for keyword substitution).
for key, value in six.iteritems(context):
if isinstance(value, six.string_types):
for key, value in context.items():
if isinstance(value, str):
context[key] = markupsafe.escape(value)
return CourseEmailTemplate._render(self.html_template, htmltext, context)
@@ -441,7 +438,7 @@ class CourseAuthorization(models.Model):
.. no_pii:
"""
class Meta(object):
class Meta:
app_label = "bulk_email"
# The course that these features are attached to.
@@ -465,7 +462,7 @@ class CourseAuthorization(models.Model):
not_en = "Not "
if self.email_enabled:
not_en = ""
return u"Course '{}': Instructor Email {}Enabled".format(text_type(self.course_id), not_en)
return "Course '{}': Instructor Email {}Enabled".format(str(self.course_id), not_en)
@python_2_unicode_compatible
@@ -516,12 +513,12 @@ class BulkEmailFlag(ConfigurationModel):
else: # implies enabled == True and require_course_email == False, so email is globally enabled
return True
class Meta(object):
class Meta:
app_label = "bulk_email"
def __str__(self):
current_model = BulkEmailFlag.current()
return u"BulkEmailFlag: enabled {}, require_course_email_auth: {}".format(
return "BulkEmailFlag: enabled {}, require_course_email_auth: {}".format(
current_model.is_enabled(),
current_model.require_course_email_auth
)

View File

@@ -5,8 +5,8 @@ Signal handlers for the bulk_email app
from django.dispatch import receiver
from openedx.core.djangoapps.user_api.accounts.signals import USER_RETIRE_MAILINGS
from common.djangoapps.student.models import CourseEnrollment
from openedx.core.djangoapps.user_api.accounts.signals import USER_RETIRE_MAILINGS
from .models import Optout

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
This module contains celery task functions for handling the sending of bulk email
to a course.
@@ -26,23 +25,24 @@ from boto.ses.exceptions import (
SESLocalAddressCharacterError,
SESMaxSendingRateExceededError
)
from celery import current_task, shared_task # lint-amnesty, pylint: disable=import-error
from celery.exceptions import RetryTaskError # lint-amnesty, pylint: disable=import-error
from celery.states import FAILURE, RETRY, SUCCESS # lint-amnesty, pylint: disable=import-error
from django.conf import settings # lint-amnesty, pylint: disable=wrong-import-order
from django.core.mail import EmailMultiAlternatives, get_connection # lint-amnesty, pylint: disable=wrong-import-order
from django.core.mail.message import forbid_multi_line_headers # lint-amnesty, pylint: disable=wrong-import-order
from django.urls import reverse # lint-amnesty, pylint: disable=wrong-import-order
from django.utils import timezone # lint-amnesty, pylint: disable=wrong-import-order
from django.utils.translation import override as override_language # lint-amnesty, pylint: disable=wrong-import-order
from django.utils.translation import ugettext as _ # lint-amnesty, pylint: disable=wrong-import-order
from edx_django_utils.monitoring import set_code_owner_attribute # lint-amnesty, pylint: disable=wrong-import-order
from markupsafe import escape # lint-amnesty, pylint: disable=wrong-import-order
from six import text_type # lint-amnesty, pylint: disable=wrong-import-order
from celery import current_task, shared_task
from celery.exceptions import RetryTaskError
from celery.states import FAILURE, RETRY, SUCCESS
from django.conf import settings
from django.core.mail import EmailMultiAlternatives, get_connection
from django.core.mail.message import forbid_multi_line_headers
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import override as override_language
from django.utils.translation import ugettext as _
from edx_django_utils.monitoring import set_code_owner_attribute
from markupsafe import escape
from common.djangoapps.util.date_utils import get_default_time_display
from common.djangoapps.util.string_utils import _has_non_ascii_characters
from lms.djangoapps.branding.api import get_logo_url_for_email
from lms.djangoapps.bulk_email.models import CourseEmail, Optout
from lms.djangoapps.bulk_email.api import get_unsubscribed_link
from lms.djangoapps.bulk_email.models import CourseEmail, Optout
from lms.djangoapps.courseware.courses import get_course
from lms.djangoapps.instructor_task.models import InstructorTask
from lms.djangoapps.instructor_task.subtasks import (
@@ -53,8 +53,6 @@ from lms.djangoapps.instructor_task.subtasks import (
)
from openedx.core.djangoapps.site_configuration import helpers as configuration_helpers
from openedx.core.lib.courses import course_image_url
from common.djangoapps.util.date_utils import get_default_time_display
from common.djangoapps.util.string_utils import _has_non_ascii_characters
log = logging.getLogger('edx.celery.task')
@@ -103,7 +101,7 @@ def _get_course_email_context(course):
"""
Returns context arguments to apply to all emails, independent of recipient.
"""
course_id = text_type(course.id)
course_id = str(course.id)
course_title = course.display_name
course_end_date = get_default_time_display(course.end)
course_root = reverse('course_root', kwargs={'course_id': course_id})
@@ -111,7 +109,7 @@ def _get_course_email_context(course):
settings.LMS_ROOT_URL,
course_root
)
image_url = u'{}{}'.format(settings.LMS_ROOT_URL, course_image_url(course))
image_url = '{}{}'.format(settings.LMS_ROOT_URL, course_image_url(course))
lms_root_url = configuration_helpers.get_value('LMS_ROOT_URL', settings.LMS_ROOT_URL)
email_context = {
'course_title': course_title,
@@ -143,8 +141,8 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
# Perfunctory check, since expansion is made for convenience of other task
# code that doesn't need the entry_id.
if course_id != entry.course_id:
format_msg = u"Course id conflict: explicit value %r does not match task value %r"
log.warning(u"Task %s: " + format_msg, task_id, course_id, entry.course_id) # lint-amnesty, pylint: disable=logging-not-lazy
format_msg = "Course id conflict: explicit value %r does not match task value %r"
log.warning("Task %s: " + format_msg, task_id, course_id, entry.course_id) # lint-amnesty, pylint: disable=logging-not-lazy
raise ValueError(format_msg % (course_id, entry.course_id))
# Fetch the CourseEmail.
@@ -154,7 +152,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
except CourseEmail.DoesNotExist:
# The CourseEmail object should be committed in the view function before the task
# is submitted and reaches this point.
log.warning(u"Task %s: Failed to get CourseEmail with id %s", task_id, email_id)
log.warning("Task %s: Failed to get CourseEmail with id %s", task_id, email_id)
raise
# Check to see if email batches have already been defined. This seems to
@@ -165,14 +163,14 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
# So we just return right away. We don't raise an exception, because we want
# the current task to be marked with whatever it had been marked with before.
if len(entry.subtasks) > 0 and len(entry.task_output) > 0:
log.warning(u"Task %s has already been processed for email %s! InstructorTask = %s", task_id, email_id, entry)
log.warning("Task %s has already been processed for email %s! InstructorTask = %s", task_id, email_id, entry)
progress = json.loads(entry.task_output)
return progress
# Sanity check that course for email_obj matches that of the task referencing it.
if course_id != email_obj.course_id:
format_msg = u"Course id conflict: explicit value %r does not match email value %r"
log.warning(u"Task %s: " + format_msg, task_id, course_id, email_obj.course_id) # lint-amnesty, pylint: disable=logging-not-lazy
format_msg = "Course id conflict: explicit value %r does not match email value %r"
log.warning("Task %s: " + format_msg, task_id, course_id, email_obj.course_id) # lint-amnesty, pylint: disable=logging-not-lazy
raise ValueError(format_msg % (course_id, email_obj.course_id))
# Fetch the course object.
@@ -192,7 +190,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
else recipient_qsets[0]
recipient_fields = ['profile__name', 'email', 'username']
log.info(u"Task %s: Preparing to queue subtasks for sending emails for course %s, email %s",
log.info("Task %s: Preparing to queue subtasks for sending emails for course %s, email %s",
task_id, course_id, email_id)
total_recipients = combined_set.count()
@@ -200,7 +198,7 @@ def perform_delegate_email_batches(entry_id, course_id, task_input, action_name)
# Weird things happen if we allow empty querysets as input to emailing subtasks
# The task appears to hang at "0 out of 0 completed" and never finishes.
if total_recipients == 0:
msg = u"Bulk Email Task: Empty recipient set"
msg = "Bulk Email Task: Empty recipient set"
log.warning(msg)
raise ValueError(msg)
@@ -276,8 +274,8 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
subtask_status = SubtaskStatus.from_dict(subtask_status_dict)
current_task_id = subtask_status.task_id
num_to_send = len(to_list)
log.info((u"Preparing to send email %s to %d recipients as subtask %s "
u"for instructor task %d: context = %s, status=%s, time=%s"),
log.info(("Preparing to send email %s to %d recipients as subtask %s "
"for instructor task %d: context = %s, status=%s, time=%s"),
email_id, num_to_send, current_task_id, entry_id, global_email_context, subtask_status, datetime.now())
# Check that the requested subtask is actually known to the current InstructorTask entry.
@@ -303,14 +301,14 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
subtask_status,
)
log.info(
u"BulkEmail ==> _send_course_email completed in : %s for task : %s with recipient count: %s",
"BulkEmail ==> _send_course_email completed in : %s for task : %s with recipient count: %s",
time.time() - start_time,
subtask_status.task_id,
len(to_list)
)
except Exception:
# Unexpected exception. Try to write out the failure to the entry before failing.
log.exception(u"Send-email task %s for email %s: failed unexpectedly!", current_task_id, email_id)
log.exception("Send-email task %s for email %s: failed unexpectedly!", current_task_id, email_id)
# We got here for really unexpected reasons. Since we don't know how far
# the task got in emailing, we count all recipients as having failed.
# It at least keeps the counts consistent.
@@ -320,22 +318,22 @@ def send_course_email(entry_id, email_id, to_list, global_email_context, subtask
if send_exception is None:
# Update the InstructorTask object that is storing its progress.
log.info(u"Send-email task %s for email %s: succeeded", current_task_id, email_id)
log.info("Send-email task %s for email %s: succeeded", current_task_id, email_id)
update_subtask_status(entry_id, current_task_id, new_subtask_status)
elif isinstance(send_exception, RetryTaskError):
# If retrying, a RetryTaskError needs to be returned to Celery.
# We assume that the the progress made before the retry condition
# was encountered has already been updated before the retry call was made,
# so we only log here.
log.warning(u"Send-email task %s for email %s: being retried", current_task_id, email_id)
log.warning("Send-email task %s for email %s: being retried", current_task_id, email_id)
raise send_exception # pylint: disable=raising-bad-type
else:
log.error(u"Send-email task %s for email %s: failed: %s", current_task_id, email_id, send_exception)
log.error("Send-email task %s for email %s: failed: %s", current_task_id, email_id, send_exception)
update_subtask_status(entry_id, current_task_id, new_subtask_status)
raise send_exception # pylint: disable=raising-bad-type
# return status in a form that can be serialized by Celery into JSON:
log.info(u"Send-email task %s for email %s: returning status %s", current_task_id, email_id, new_subtask_status)
log.info("Send-email task %s for email %s: returning status %s", current_task_id, email_id, new_subtask_status)
return new_subtask_status.to_dict()
@@ -388,10 +386,10 @@ def _get_source_address(course_id, course_title, course_language, truncate=True)
# RFC2821 requires the byte order of the email address to be the name then email
# e.g. "John Doe <email@example.com>"
# Although the display will be flipped in RTL languages, the byte order is still the same.
from_addr_format = u'{name} {email}'.format(
from_addr_format = '{name} {email}'.format(
# Translators: Bulk email from address e.g. ("Physics 101" Course Staff)
name=_(u'"{course_title}" Course Staff'),
email=u'<{course_name}-{from_email}>', # xss-lint: disable=python-wrap-html
name=_('"{course_title}" Course Staff'),
email='<{course_name}-{from_email}>', # xss-lint: disable=python-wrap-html
)
def format_address(course_title_no_quotes):
@@ -465,7 +463,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
recipients_info = Counter()
log.info(
u"BulkEmail ==> Task: %s, SubTask: %s, EmailId: %s, TotalRecipients: %s",
"BulkEmail ==> Task: %s, SubTask: %s, EmailId: %s, TotalRecipients: %s",
parent_task_id,
task_id,
email_id,
@@ -476,7 +474,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
course_email = CourseEmail.objects.get(id=email_id)
except CourseEmail.DoesNotExist as exc:
log.exception(
u"BulkEmail ==> Task: %s, SubTask: %s, EmailId: %s, Could not find email to send.",
"BulkEmail ==> Task: %s, SubTask: %s, EmailId: %s, Could not find email to send.",
parent_task_id,
task_id,
email_id
@@ -525,8 +523,8 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
to_list.pop()
total_recipients_failed += 1
log.info(
u"BulkEmail ==> Email address %s contains non-ascii characters. Skipping sending "
u"email to %s, EmailId: %s ",
"BulkEmail ==> Email address %s contains non-ascii characters. Skipping sending "
"email to %s, EmailId: %s ",
email,
current_recipient['profile__name'],
email_id
@@ -539,7 +537,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
email_context['user_id'] = current_recipient['pk']
email_context['course_id'] = course_email.course_id
email_context['unsubscribe_link'] = get_unsubscribed_link(current_recipient['username'],
text_type(course_email.course_id))
str(course_email.course_id))
# Construct message content using templates and context:
plaintext_msg = course_email_template.render_plaintext(course_email.text_message, email_context)
@@ -565,7 +563,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
try:
log.info(
u"BulkEmail ==> Task: %s, SubTask: %s, EmailId: %s, Recipient num: %s/%s, \
"BulkEmail ==> Task: %s, SubTask: %s, EmailId: %s, Recipient num: %s/%s, \
Recipient name: %s, Email address: %s",
parent_task_id,
task_id,
@@ -581,7 +579,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
# According to SMTP spec, we'll retry error codes in the 4xx range. 5xx range indicates hard failure.
total_recipients_failed += 1
log.error(
u"BulkEmail ==> Status: Failed(SMTPDataError), Task: %s, SubTask: %s, EmailId: %s, \
"BulkEmail ==> Status: Failed(SMTPDataError), Task: %s, SubTask: %s, EmailId: %s, \
Recipient num: %s/%s, Email address: %s",
parent_task_id,
task_id,
@@ -596,7 +594,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
else:
# This will fall through and not retry the message.
log.warning(
u'BulkEmail ==> Task: %s, SubTask: %s, EmailId: %s, Recipient num: %s/%s, \
'BulkEmail ==> Task: %s, SubTask: %s, EmailId: %s, Recipient num: %s/%s, \
Email not delivered to %s due to error %s',
parent_task_id,
task_id,
@@ -612,7 +610,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
# This will fall through and not retry the message.
total_recipients_failed += 1
log.error(
u"BulkEmail ==> Status: Failed(SINGLE_EMAIL_FAILURE_ERRORS), Task: %s, SubTask: %s, \
"BulkEmail ==> Status: Failed(SINGLE_EMAIL_FAILURE_ERRORS), Task: %s, SubTask: %s, \
EmailId: %s, Recipient num: %s/%s, Email address: %s, Exception: %s",
parent_task_id,
task_id,
@@ -627,7 +625,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
else:
total_recipients_successful += 1
log.info(
u"BulkEmail ==> Status: Success, Task: %s, SubTask: %s, EmailId: %s, \
"BulkEmail ==> Status: Success, Task: %s, SubTask: %s, EmailId: %s, \
Recipient num: %s/%s, Email address: %s,",
parent_task_id,
task_id,
@@ -637,9 +635,9 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
email
)
if settings.BULK_EMAIL_LOG_SENT_EMAILS:
log.info(u'Email with id %s sent to %s', email_id, email)
log.info('Email with id %s sent to %s', email_id, email)
else:
log.debug(u'Email with id %s sent to %s', email_id, email)
log.debug('Email with id %s sent to %s', email_id, email)
subtask_status.increment(succeeded=1)
# Pop the user that was emailed off the end of the list only once they have
@@ -649,7 +647,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
to_list.pop()
log.info(
u"BulkEmail ==> Task: %s, SubTask: %s, EmailId: %s, Total Successful Recipients: %s/%s, \
"BulkEmail ==> Task: %s, SubTask: %s, EmailId: %s, Total Successful Recipients: %s/%s, \
Failed Recipients: %s/%s, Time Taken: %s",
parent_task_id,
task_id,
@@ -660,11 +658,11 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
total_recipients,
time.time() - start_time
)
duplicate_recipients = [u"{0} ({1})".format(email, repetition)
duplicate_recipients = [f"{email} ({repetition})"
for email, repetition in recipients_info.most_common() if repetition > 1]
if duplicate_recipients:
log.info(
u"BulkEmail ==> Task: %s, SubTask: %s, EmailId: %s, Total Duplicate Recipients [%s]: [%s]",
"BulkEmail ==> Task: %s, SubTask: %s, EmailId: %s, Total Duplicate Recipients [%s]: [%s]",
parent_task_id,
task_id,
email_id,
@@ -693,8 +691,8 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
except BULK_EMAIL_FAILURE_ERRORS as exc:
num_pending = len(to_list)
log.exception((u'Task %s: email with id %d caused send_course_email task to fail '
u'with u"fatal" exception. %d emails unsent.'),
log.exception(('Task %s: email with id %d caused send_course_email task to fail '
'with u"fatal" exception. %d emails unsent.'),
task_id, email_id, num_pending)
# Update counters with progress to date, counting unsent emails as failures,
# and set the state to FAILURE:
@@ -706,7 +704,7 @@ def _send_course_email(entry_id, email_id, to_list, global_email_context, subtas
# without popping the current recipient off of the existing list.
# These are unexpected errors. Since they might be due to a temporary condition that might
# succeed on retry, we give them a retry.
log.exception((u'Task %s: email with id %d caused send_course_email task to fail '
log.exception(('Task %s: email with id %d caused send_course_email task to fail '
'with unexpected exception. Generating retry.'),
task_id, email_id)
# Increment the "retried_withmax" counter, update other counters with progress to date,
@@ -769,7 +767,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context,
Otherwise, it (ought to be) the current_exception passed in.
"""
task_id = subtask_status.task_id
log.info(u"Task %s: Successfully sent to %s users; failed to send to %s users (and skipped %s users)",
log.info("Task %s: Successfully sent to %s users; failed to send to %s users (and skipped %s users)",
task_id, subtask_status.succeeded, subtask_status.failed, subtask_status.skipped)
# Calculate time until we retry this task (in seconds):
@@ -794,8 +792,8 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context,
# retries are deferred by the same amount.
countdown = ((2 ** retry_index) * base_delay) * random.uniform(.75, 1.25)
log.warning((u'Task %s: email with id %d not delivered due to %s error %s, '
u'retrying send to %d recipients in %s seconds (with max_retry=%s)'),
log.warning(('Task %s: email with id %d not delivered due to %s error %s, '
'retrying send to %d recipients in %s seconds (with max_retry=%s)'),
task_id, email_id, exception_type, current_exception, len(to_list), countdown, max_retries)
# we make sure that we update the InstructorTask with the current subtask status
@@ -824,7 +822,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context,
except RetryTaskError as retry_error:
# If the retry call is successful, update with the current progress:
log.info(
u'Task %s: email with id %d caused send_course_email task to retry again.',
'Task %s: email with id %d caused send_course_email task to retry again.',
task_id,
email_id
)
@@ -835,7 +833,7 @@ def _submit_for_retry(entry_id, email_id, to_list, global_email_context,
# (and put it in retry_exc just in case it's different, but it shouldn't be),
# and update status as if it were any other failure. That means that
# the recipients still in the to_list are counted as failures.
log.exception(u'Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s',
log.exception('Task %s: email with id %d caused send_course_email task to fail to retry. To list: %s',
task_id, email_id, [i['email'] for i in to_list])
num_failed = len(to_list)
subtask_status.increment(failed=num_failed, state=FAILURE)

View File

@@ -1,10 +1,10 @@
# -*- coding: utf-8 -*-
"""
Unit tests for student optouts from course email
"""
import json
from unittest.mock import Mock, patch
from django.core import mail
from django.core.management import call_command
@@ -13,18 +13,15 @@ from edx_ace.channel import ChannelType
from edx_ace.message import Message
from edx_ace.policy import PolicyResult
from edx_ace.recipient import Recipient
from mock import Mock, patch
from six import text_type
from lms.djangoapps.bulk_email.models import BulkEmailFlag
from lms.djangoapps.bulk_email.policies import CourseEmailOptout
from common.djangoapps.student.models import CourseEnrollment
from common.djangoapps.student.tests.factories import AdminFactory, CourseEnrollmentFactory, UserFactory
from lms.djangoapps.bulk_email.api import get_unsubscribed_link
from lms.djangoapps.bulk_email.models import BulkEmailFlag
from lms.djangoapps.bulk_email.policies import CourseEmailOptout
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from lms.djangoapps.bulk_email.api import get_unsubscribed_link
@patch('lms.djangoapps.bulk_email.models.html_to_text', Mock(return_value='Mocking CourseEmail.text_message', autospec=True)) # lint-amnesty, pylint: disable=line-too-long
class TestOptoutCourseEmails(ModuleStoreTestCase):
@@ -33,8 +30,8 @@ class TestOptoutCourseEmails(ModuleStoreTestCase):
"""
def setUp(self):
super(TestOptoutCourseEmails, self).setUp() # lint-amnesty, pylint: disable=super-with-arguments
course_title = u"ẗëṡẗ title イ乇丂イ ᄊ乇丂丂ムg乇 キo尺 ムレレ тэѕт мэѕѕаБэ"
super().setUp()
course_title = "ẗëṡẗ title イ乇丂イ ᄊ乇丂丂ムg乇 キo尺 ムレレ тэѕт мэѕѕаБэ"
self.course = CourseFactory.create(run='testcourse1', display_name=course_title)
self.instructor = AdminFactory.create()
self.student = UserFactory.create()
@@ -45,9 +42,9 @@ class TestOptoutCourseEmails(ModuleStoreTestCase):
self.client.login(username=self.student.username, password="test")
self.send_mail_url = reverse('send_email', kwargs={'course_id': text_type(self.course.id)})
self.send_mail_url = reverse('send_email', kwargs={'course_id': str(self.course.id)})
self.success_content = {
'course_id': text_type(self.course.id),
'course_id': str(self.course.id),
'success': True,
}
BulkEmailFlag.objects.create(enabled=True, require_course_email_auth=False)
@@ -55,7 +52,7 @@ class TestOptoutCourseEmails(ModuleStoreTestCase):
def navigate_to_email_view(self):
"""Navigate to the instructor dash's email view"""
# Pull up email view on instructor dashboard
url = reverse('instructor_dashboard', kwargs={'course_id': text_type(self.course.id)})
url = reverse('instructor_dashboard', kwargs={'course_id': str(self.course.id)})
response = self.client.get(url)
email_section = '<div class="vert-left send-email" id="section-send-email">'
# If this fails, it is likely because BulkEmailFlag.is_enabled() is set to False
@@ -68,7 +65,7 @@ class TestOptoutCourseEmails(ModuleStoreTestCase):
url = reverse('change_email_settings')
# This is a checkbox, so on the post of opting out (that is, an Un-check of the box),
# the Post that is sent will not contain 'receive_emails'
response = self.client.post(url, {'course_id': text_type(self.course.id)})
response = self.client.post(url, {'course_id': str(self.course.id)})
assert json.loads(response.content.decode('utf-8')) == {'success': True}
self.client.logout()
@@ -97,7 +94,7 @@ class TestOptoutCourseEmails(ModuleStoreTestCase):
self.client.login(username=self.instructor.username, password="test")
unsubscribe_link = get_unsubscribed_link(self.student.username, text_type(self.course.id))
unsubscribe_link = get_unsubscribed_link(self.student.username, str(self.course.id))
response = self.client.post(unsubscribe_link, {'unsubscribe': True})
assert response.status_code == 200
@@ -118,7 +115,7 @@ class TestOptoutCourseEmails(ModuleStoreTestCase):
Make sure student receives course email after opting in.
"""
url = reverse('change_email_settings')
response = self.client.post(url, {'course_id': text_type(self.course.id), 'receive_emails': 'on'})
response = self.client.post(url, {'course_id': str(self.course.id), 'receive_emails': 'on'})
assert json.loads(response.content.decode('utf-8')) == {'success': True}
self.client.logout()
@@ -151,8 +148,8 @@ class TestACEOptoutCourseEmails(ModuleStoreTestCase):
"""
def setUp(self):
super(TestACEOptoutCourseEmails, self).setUp() # lint-amnesty, pylint: disable=super-with-arguments
course_title = u"ẗëṡẗ title イ乇丂イ ᄊ乇丂丂ムg乇 キo尺 ムレレ тэѕт мэѕѕаБэ"
super().setUp()
course_title = "ẗëṡẗ title イ乇丂イ ᄊ乇丂丂ムg乇 キo尺 ムレレ тэѕт мэѕѕаБэ"
self.course = CourseFactory.create(run='testcourse1', display_name=course_title)
self.instructor = AdminFactory.create()
self.student = UserFactory.create()
@@ -167,7 +164,7 @@ class TestACEOptoutCourseEmails(ModuleStoreTestCase):
url = reverse('change_email_settings')
# This is a checkbox, so on the post of opting out (that is, an Un-check of the box),
# the Post that is sent will not contain 'receive_emails'
post_data = {'course_id': text_type(self.course.id)}
post_data = {'course_id': str(self.course.id)}
if not opted_out:
post_data['receive_emails'] = 'on'

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
Unit tests for sending course email
"""
@@ -7,10 +6,9 @@ Unit tests for sending course email
import json
import os
from unittest import skipIf
from unittest.mock import Mock, patch
import ddt
import six
from six.moves import range
from django.conf import settings
from django.core import mail
from django.core.mail.message import forbid_multi_line_headers
@@ -19,30 +17,29 @@ from django.test.utils import override_settings
from django.urls import reverse
from django.utils.translation import get_language
from markupsafe import escape
from mock import Mock, patch
from common.djangoapps.course_modes.models import CourseMode
from common.djangoapps.student.models import CourseEnrollment
from common.djangoapps.student.roles import CourseStaffRole
from common.djangoapps.student.tests.factories import CourseEnrollmentFactory, UserFactory
from lms.djangoapps.bulk_email.tasks import _get_course_email_context, _get_source_address
from lms.djangoapps.courseware.tests.factories import InstructorFactory, StaffFactory
from lms.djangoapps.instructor_task.subtasks import update_subtask_status
from openedx.core.djangoapps.course_groups.cohorts import add_user_to_cohort
from openedx.core.djangoapps.course_groups.models import CourseCohort
from openedx.core.djangoapps.enrollments.api import update_enrollment
from xmodule.modulestore import ModuleStoreEnum
from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from ..models import BulkEmailFlag, Optout
from lms.djangoapps.bulk_email.tasks import _get_course_email_context, _get_source_address # lint-amnesty, pylint: disable=wrong-import-order
from common.djangoapps.course_modes.models import CourseMode # lint-amnesty, pylint: disable=wrong-import-order
from lms.djangoapps.courseware.tests.factories import InstructorFactory, StaffFactory # lint-amnesty, pylint: disable=wrong-import-order
from lms.djangoapps.instructor_task.subtasks import update_subtask_status # lint-amnesty, pylint: disable=wrong-import-order
from openedx.core.djangoapps.course_groups.cohorts import add_user_to_cohort # lint-amnesty, pylint: disable=wrong-import-order
from openedx.core.djangoapps.course_groups.models import CourseCohort # lint-amnesty, pylint: disable=wrong-import-order
from openedx.core.djangoapps.enrollments.api import update_enrollment # lint-amnesty, pylint: disable=wrong-import-order
from common.djangoapps.student.models import CourseEnrollment # lint-amnesty, pylint: disable=wrong-import-order
from common.djangoapps.student.roles import CourseStaffRole # lint-amnesty, pylint: disable=wrong-import-order
from common.djangoapps.student.tests.factories import CourseEnrollmentFactory, UserFactory # lint-amnesty, pylint: disable=wrong-import-order
from xmodule.modulestore import ModuleStoreEnum # lint-amnesty, pylint: disable=wrong-import-order
from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase # lint-amnesty, pylint: disable=wrong-import-order
from xmodule.modulestore.tests.factories import CourseFactory # lint-amnesty, pylint: disable=wrong-import-order
STAFF_COUNT = 3
STUDENT_COUNT = 10
LARGE_NUM_EMAILS = 137
class MockCourseEmailResult(object):
class MockCourseEmailResult:
"""
A small closure-like class to keep count of emails sent over all tasks, recorded
by mock object side effects
@@ -95,7 +92,7 @@ class EmailSendFromDashboardTestCase(SharedModuleStoreTestCase):
Goes to the instructor dashboard to verify that the email section is
there.
"""
url = reverse('instructor_dashboard', kwargs={'course_id': six.text_type(self.course.id)})
url = reverse('instructor_dashboard', kwargs={'course_id': str(self.course.id)})
# Response loads the whole instructor dashboard, so no need to explicitly
# navigate to a particular email section
response = self.client.get(url)
@@ -105,15 +102,15 @@ class EmailSendFromDashboardTestCase(SharedModuleStoreTestCase):
@classmethod
def setUpClass(cls):
super(EmailSendFromDashboardTestCase, cls).setUpClass()
course_title = u"ẗëṡẗ title イ乇丂イ ᄊ乇丂丂ムg乇 キo尺 ムレレ тэѕт мэѕѕаБэ"
super().setUpClass()
course_title = "ẗëṡẗ title イ乇丂イ ᄊ乇丂丂ムg乇 キo尺 ムレレ тэѕт мэѕѕаБэ"
cls.course = CourseFactory.create(
display_name=course_title,
default_store=ModuleStoreEnum.Type.split
)
def setUp(self):
super(EmailSendFromDashboardTestCase, self).setUp() # lint-amnesty, pylint: disable=super-with-arguments
super().setUp()
BulkEmailFlag.objects.create(enabled=True, require_course_email_auth=False)
self.create_staff_and_instructor()
self.create_students()
@@ -126,19 +123,19 @@ class EmailSendFromDashboardTestCase(SharedModuleStoreTestCase):
# Pulling up the instructor dash email view here allows us to test sending emails in tests
self.goto_instructor_dash_email_view()
self.send_mail_url = reverse(
'send_email', kwargs={'course_id': six.text_type(self.course.id)}
'send_email', kwargs={'course_id': str(self.course.id)}
)
self.success_content = {
'course_id': six.text_type(self.course.id),
'course_id': str(self.course.id),
'success': True,
}
def tearDown(self):
super(EmailSendFromDashboardTestCase, self).tearDown() # lint-amnesty, pylint: disable=super-with-arguments
super().tearDown()
BulkEmailFlag.objects.all().delete()
class SendEmailWithMockedUgettextMixin(object):
class SendEmailWithMockedUgettextMixin:
"""
Mock uggetext for EmailSendFromDashboardTestCase.
"""
@@ -161,7 +158,7 @@ class SendEmailWithMockedUgettextMixin(object):
>>> mock_ugettext('Hello') == 'AR Hello'
"""
return u'{lang} {text}'.format(
return '{lang} {text}'.format(
lang=get_language().upper(),
text=text,
)
@@ -214,8 +211,8 @@ class LocalizedFromAddressCourseLangTestCase(SendEmailWithMockedUgettextMixin, E
"""
Creates a different course.
"""
super(LocalizedFromAddressCourseLangTestCase, cls).setUpClass()
course_title = u"ẗëṡẗ イэ"
super().setUpClass()
course_title = "ẗëṡẗ イэ"
cls.course = CourseFactory.create(
display_name=course_title,
language='ar',
@@ -288,11 +285,7 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
# the 1 is for the instructor in this test and others
assert len(mail.outbox) == (1 + len(self.staff))
six.assertCountEqual(
self,
[e.to[0] for e in mail.outbox],
[self.instructor.email] + [s.email for s in self.staff]
)
assert len([e.to[0] for e in mail.outbox]) == len([self.instructor.email] + [s.email for s in self.staff])
def test_send_to_cohort(self):
"""
@@ -303,18 +296,14 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
add_user_to_cohort(cohort.course_user_group, student.username)
test_email = {
'action': 'Send email',
'send_to': '["cohort:{}"]'.format(cohort.course_user_group.name),
'send_to': f'["cohort:{cohort.course_user_group.name}"]',
'subject': 'test subject for cohort',
'message': 'test message for cohort'
}
response = self.client.post(self.send_mail_url, test_email)
assert json.loads(response.content.decode('utf-8')) == self.success_content
six.assertCountEqual(
self,
[e.to[0] for e in mail.outbox],
[s.email for s in self.students]
)
assert len([e.to[0] for e in mail.outbox]) == len([s.email for s in self.students])
def test_send_to_cohort_unenrolled(self):
"""
@@ -326,7 +315,7 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
add_user_to_cohort(cohort.course_user_group, student.username)
test_email = {
'action': 'Send email',
'send_to': '["cohort:{}"]'.format(cohort.course_user_group.name),
'send_to': f'["cohort:{cohort.course_user_group.name}"]',
'subject': 'test subject for cohort',
'message': 'test message for cohort'
}
@@ -342,7 +331,7 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
"""
CourseMode.objects.create(mode_slug='test', course_id=self.course.id)
for student in self.students:
update_enrollment(student, six.text_type(self.course.id), 'test')
update_enrollment(student, str(self.course.id), 'test')
test_email = {
'action': 'Send email',
'send_to': '["track:test"]',
@@ -352,11 +341,7 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
response = self.client.post(self.send_mail_url, test_email)
assert json.loads(response.content.decode('utf-8')) == self.success_content
six.assertCountEqual(
self,
[e.to[0] for e in mail.outbox],
[s.email for s in self.students]
)
assert len([e.to[0] for e in mail.outbox]) == len([s.email for s in self.students])
def test_send_to_track_other_enrollments(self):
"""
@@ -367,7 +352,7 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
# Create a mode and designate an enrolled user to be placed in that mode
CourseMode.objects.create(mode_slug='test_mode', course_id=self.course.id)
test_mode_student = self.students[0]
update_enrollment(test_mode_student, six.text_type(self.course.id), 'test_mode')
update_enrollment(test_mode_student, str(self.course.id), 'test_mode')
# Take another user already enrolled in the course, then enroll them in
# another course but in that same test mode
@@ -378,7 +363,7 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
user=test_mode_student_other_course,
course_id=other_course.id
)
update_enrollment(test_mode_student_other_course, six.text_type(other_course.id), 'test_mode')
update_enrollment(test_mode_student_other_course, str(other_course.id), 'test_mode')
# Send the emails...
test_email = {
@@ -411,11 +396,8 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
# the 1 is for the instructor
assert len(mail.outbox) == ((1 + len(self.staff)) + len(self.students))
six.assertCountEqual(
self,
[e.to[0] for e in mail.outbox],
[self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students]
)
assert len([e.to[0] for e in mail.outbox]) == \
len([self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students])
@override_settings(BULK_EMAIL_JOB_SIZE_THRESHOLD=1)
def test_send_to_all_high_queue(self):
@@ -458,7 +440,7 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
Make sure email (with Unicode characters) send to all goes there.
"""
uni_subject = u'téśt śúbjéćt főŕ áĺĺ'
uni_subject = 'téśt śúbjéćt főŕ áĺĺ'
test_email = {
'action': 'Send email',
'send_to': '["myself", "staff", "learners"]',
@@ -469,11 +451,8 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
assert json.loads(response.content.decode('utf-8')) == self.success_content
assert len(mail.outbox) == ((1 + len(self.staff)) + len(self.students))
six.assertCountEqual(
self,
[e.to[0] for e in mail.outbox],
[self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students]
)
assert len([e.to[0] for e in mail.outbox]) ==\
len([self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students])
assert mail.outbox[0].subject == uni_subject
def test_unicode_students_send_to_all(self):
@@ -482,7 +461,7 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
"""
# Create a student with Unicode in their first & last names
unicode_user = UserFactory(first_name=u'Ⓡⓞⓑⓞⓣ', last_name=u'ՇﻉรՇ')
unicode_user = UserFactory(first_name='Ⓡⓞⓑⓞⓣ', last_name='ՇﻉรՇ')
CourseEnrollmentFactory.create(user=unicode_user, course_id=self.course.id)
self.students.append(unicode_user)
@@ -497,11 +476,8 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
assert len(mail.outbox) == ((1 + len(self.staff)) + len(self.students))
six.assertCountEqual(
self,
[e.to[0] for e in mail.outbox],
[self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students]
)
assert len([e.to[0] for e in mail.outbox]) ==\
len([self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students])
@override_settings(BULK_EMAIL_DEFAULT_FROM_EMAIL="no-reply@courseupdates.edx.org")
def test_long_course_display_name(self):
@@ -519,7 +495,7 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
# make display_name that's longer than 320 characters when encoded
# to ascii and escaped, but shorter than 320 unicode characters
long_name = u"Финансовое программирование и политика, часть 1: макроэкономические счета и анализ"
long_name = "Финансовое программирование и политика, часть 1: макроэкономические счета и анализ"
course = CourseFactory.create(
display_name=long_name,
@@ -545,7 +521,7 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
assert len(unexpected_from_addr) == 137
self.login_as_user(instructor)
send_mail_url = reverse('send_email', kwargs={'course_id': six.text_type(course.id)})
send_mail_url = reverse('send_email', kwargs={'course_id': str(course.id)})
response = self.client.post(send_mail_url, test_email)
assert json.loads(response.content.decode('utf-8'))['success']
@@ -553,7 +529,7 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
from_email = mail.outbox[0].from_email
expected_from_addr = (
u'"{course_name}" Course Staff <{course_name}-no-reply@courseupdates.edx.org>'
'"{course_name}" Course Staff <{course_name}-no-reply@courseupdates.edx.org>'
).format(course_name=course.id.course)
assert from_email == expected_from_addr
@@ -596,7 +572,7 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
[s.email for s in self.staff] +
[s.email for s in self.students] +
[s.email for s in added_users if s not in optouts])
six.assertCountEqual(self, outbox_contents, should_send_contents)
assert len(outbox_contents) == len(should_send_contents)
def test_unsubscribe_link_in_email(self):
"""
@@ -617,8 +593,8 @@ class TestEmailSendFromDashboardMockedHtmlToText(EmailSendFromDashboardTestCase)
plain_template = m.body
html_template = m.alternatives[0][0]
assert u'bulk_email/email/optout/' in plain_template
assert u'bulk_email/email/optout/' in html_template
assert 'bulk_email/email/optout/' in plain_template
assert 'bulk_email/email/optout/' in html_template
@skipIf(os.environ.get("TRAVIS") == 'true', "Skip this test in Travis CI.")
@@ -635,7 +611,7 @@ class TestEmailSendFromDashboard(EmailSendFromDashboardTestCase):
Make sure email (with Unicode characters) send to all goes there.
"""
uni_message = u'ẗëṡẗ ṁëṡṡäġë ḟöṛ äḷḷ イ乇丂イ ᄊ乇丂丂ムg乇 キo尺 ムレレ тэѕт мэѕѕаБэ fоѓ аll'
uni_message = 'ẗëṡẗ ṁëṡṡäġë ḟöṛ äḷḷ イ乇丂イ ᄊ乇丂丂ムg乇 キo尺 ムレレ тэѕт мэѕѕаБэ fоѓ аll'
test_email = {
'action': 'Send email',
'send_to': '["myself", "staff", "learners"]',
@@ -646,11 +622,8 @@ class TestEmailSendFromDashboard(EmailSendFromDashboardTestCase):
assert json.loads(response.content.decode('utf-8')) == self.success_content
assert len(mail.outbox) == ((1 + len(self.staff)) + len(self.students))
six.assertCountEqual(
self,
[e.to[0] for e in mail.outbox],
[self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students]
)
assert len([e.to[0] for e in mail.outbox]) ==\
len([self.instructor.email] + [s.email for s in self.staff] + [s.email for s in self.students])
message_body = mail.outbox[0].body
assert uni_message in message_body
@@ -666,8 +639,8 @@ class TestCourseEmailContext(SharedModuleStoreTestCase):
"""
Create a course shared by all tests.
"""
super(TestCourseEmailContext, cls).setUpClass()
cls.course_title = u"Финансовое программирование и политика, часть 1: макроэкономические счета и анализ"
super().setUpClass()
cls.course_title = "Финансовое программирование и политика, часть 1: макроэкономические счета и анализ"
cls.course_org = 'IMF'
cls.course_number = "FPP.1x"
cls.course_run = "2016"

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
Unit tests for handling email sending errors
"""
@@ -7,6 +6,8 @@ Unit tests for handling email sending errors
import json
from itertools import cycle
from smtplib import SMTPConnectError, SMTPDataError, SMTPServerDisconnected
from unittest.mock import Mock, patch
import pytest
import ddt
from celery.states import RETRY, SUCCESS
@@ -14,11 +15,9 @@ from django.conf import settings
from django.core.management import call_command
from django.db import DatabaseError
from django.urls import reverse
from mock import Mock, patch
from opaque_keys.edx.locator import CourseLocator
from six import text_type
from six.moves import range
from common.djangoapps.student.tests.factories import AdminFactory, CourseEnrollmentFactory, UserFactory
from lms.djangoapps.bulk_email.models import SEND_TO_MYSELF, BulkEmailFlag, CourseEmail
from lms.djangoapps.bulk_email.tasks import perform_delegate_email_batches, send_course_email
from lms.djangoapps.courseware.exceptions import CourseRunNotFound
@@ -31,7 +30,6 @@ from lms.djangoapps.instructor_task.subtasks import (
initialize_subtask_info,
update_subtask_status
)
from common.djangoapps.student.tests.factories import AdminFactory, CourseEnrollmentFactory, UserFactory
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
@@ -51,29 +49,29 @@ class TestEmailErrors(ModuleStoreTestCase):
ENABLED_CACHES = ['default', 'mongo_metadata_inheritance', 'loc_cache']
def setUp(self):
super(TestEmailErrors, self).setUp() # lint-amnesty, pylint: disable=super-with-arguments
course_title = u"ẗëṡẗ title イ乇丂イ ᄊ乇丂丂ムg乇 キo尺 ムレレ тэѕт мэѕѕаБэ"
super().setUp()
course_title = "ẗëṡẗ title イ乇丂イ ᄊ乇丂丂ムg乇 キo尺 ムレレ тэѕт мэѕѕаБэ"
self.course = CourseFactory.create(display_name=course_title)
self.instructor = AdminFactory.create()
self.client.login(username=self.instructor.username, password="test")
# load initial content (since we don't run migrations as part of tests):
call_command("loaddata", "course_email_template.json")
self.url = reverse('instructor_dashboard', kwargs={'course_id': text_type(self.course.id)})
self.send_mail_url = reverse('send_email', kwargs={'course_id': text_type(self.course.id)})
self.url = reverse('instructor_dashboard', kwargs={'course_id': str(self.course.id)})
self.send_mail_url = reverse('send_email', kwargs={'course_id': str(self.course.id)})
self.success_content = {
'course_id': text_type(self.course.id),
'course_id': str(self.course.id),
'success': True,
}
@classmethod
def setUpClass(cls):
super(TestEmailErrors, cls).setUpClass()
super().setUpClass()
BulkEmailFlag.objects.create(enabled=True, require_course_email_auth=False)
@classmethod
def tearDownClass(cls):
super(TestEmailErrors, cls).tearDownClass()
super().tearDownClass()
BulkEmailFlag.objects.all().delete()
@patch('lms.djangoapps.bulk_email.tasks.get_connection', autospec=True)
@@ -226,7 +224,7 @@ class TestEmailErrors(ModuleStoreTestCase):
email = CourseEmail.create( # pylint: disable=unused-variable
self.course.id,
self.instructor,
[u"{}:IDONTEXIST".format(target_type)],
[f"{target_type}:IDONTEXIST"],
"re: subject",
"dummy body goes here"
)

View File

@@ -1,11 +1,9 @@
# -*- coding: utf-8 -*-
"""
Unit tests for bulk-email-related forms.
"""
from opaque_keys.edx.locator import CourseLocator
from six import text_type
from lms.djangoapps.bulk_email.api import is_bulk_email_feature_enabled
from lms.djangoapps.bulk_email.forms import CourseAuthorizationAdminForm, CourseEmailTemplateForm
@@ -18,20 +16,20 @@ class CourseAuthorizationFormTest(ModuleStoreTestCase):
"""Test the CourseAuthorizationAdminForm form for Mongo-backed courses."""
def setUp(self):
super(CourseAuthorizationFormTest, self).setUp() # lint-amnesty, pylint: disable=super-with-arguments
course_title = u"ẗëṡẗ title イ乇丂イ ᄊ乇丂丂ムg乇 キo尺 ムレレ тэѕт мэѕѕаБэ"
super().setUp()
course_title = "ẗëṡẗ title イ乇丂イ ᄊ乇丂丂ムg乇 キo尺 ムレレ тэѕт мэѕѕаБэ"
self.course = CourseFactory.create(display_name=course_title)
BulkEmailFlag.objects.create(enabled=True, require_course_email_auth=True)
def tearDown(self):
super(CourseAuthorizationFormTest, self).tearDown() # lint-amnesty, pylint: disable=super-with-arguments
super().tearDown()
BulkEmailFlag.objects.all().delete()
def test_authorize_mongo_course(self):
# Initially course shouldn't be authorized
assert not is_bulk_email_feature_enabled(self.course.id)
# Test authorizing the course, which should totally work
form_data = {'course_id': text_type(self.course.id), 'email_enabled': True}
form_data = {'course_id': str(self.course.id), 'email_enabled': True}
form = CourseAuthorizationAdminForm(data=form_data)
# Validation should work
assert form.is_valid()
@@ -43,7 +41,7 @@ class CourseAuthorizationFormTest(ModuleStoreTestCase):
# Initially course shouldn't be authorized
assert not is_bulk_email_feature_enabled(self.course.id)
# Test authorizing the course, which should totally work
form_data = {'course_id': text_type(self.course.id), 'email_enabled': True}
form_data = {'course_id': str(self.course.id), 'email_enabled': True}
form = CourseAuthorizationAdminForm(data=form_data)
# Validation should work
assert form.is_valid()
@@ -52,7 +50,7 @@ class CourseAuthorizationFormTest(ModuleStoreTestCase):
assert is_bulk_email_feature_enabled(self.course.id)
# Now make a new course authorization with the same course id that tries to turn email off
form_data = {'course_id': text_type(self.course.id), 'email_enabled': False}
form_data = {'course_id': str(self.course.id), 'email_enabled': False}
form = CourseAuthorizationAdminForm(data=form_data)
# Validation should not work because course_id field is unique
assert not form.is_valid()
@@ -69,15 +67,15 @@ class CourseAuthorizationFormTest(ModuleStoreTestCase):
def test_form_typo(self):
# Munge course id
bad_id = CourseLocator(u'Broken{}'.format(self.course.id.org), 'hello', self.course.id.run + '_typo')
bad_id = CourseLocator(f'Broken{self.course.id.org}', 'hello', self.course.id.run + '_typo')
form_data = {'course_id': text_type(bad_id), 'email_enabled': True}
form_data = {'course_id': str(bad_id), 'email_enabled': True}
form = CourseAuthorizationAdminForm(data=form_data)
# Validation shouldn't work
assert not form.is_valid()
msg = u'Course not found.'
msg += u' Entered course id was: "{0}".'.format(text_type(bad_id))
msg = 'Course not found.'
msg += ' Entered course id was: "{}".'.format(str(bad_id))
assert msg == form._errors['course_id'][0] # pylint: disable=protected-access
with self.assertRaisesRegex(
@@ -92,8 +90,8 @@ class CourseAuthorizationFormTest(ModuleStoreTestCase):
# Validation shouldn't work
assert not form.is_valid()
msg = u'Course id invalid.'
msg += u' Entered course id was: "asd::**!@#$%^&*())//foobar!!".'
msg = 'Course id invalid.'
msg += ' Entered course id was: "asd::**!@#$%^&*())//foobar!!".'
assert msg == form._errors['course_id'][0] # pylint: disable=protected-access
with self.assertRaisesRegex(
@@ -110,7 +108,7 @@ class CourseAuthorizationFormTest(ModuleStoreTestCase):
assert not form.is_valid()
error_msg = form._errors['course_id'][0] # pylint: disable=protected-access
assert u'Entered course id was: "{0}".'.format(self.course.id.run) in error_msg
assert f'Entered course id was: "{self.course.id.run}".' in error_msg
with self.assertRaisesRegex(
ValueError,

View File

@@ -4,15 +4,17 @@ Unit tests for bulk-email-related models.
import datetime
from unittest.mock import Mock, patch
import pytest
import ddt
from django.core.management import call_command
from django.test import TestCase
from mock import Mock, patch
from opaque_keys.edx.keys import CourseKey
from pytz import UTC
from common.djangoapps.course_modes.models import CourseMode
from common.djangoapps.student.tests.factories import UserFactory
from lms.djangoapps.bulk_email.api import is_bulk_email_feature_enabled
from lms.djangoapps.bulk_email.models import (
SEND_TO_COHORT,
@@ -22,11 +24,9 @@ from lms.djangoapps.bulk_email.models import (
CourseAuthorization,
CourseEmail,
CourseEmailTemplate,
Optout,
Optout
)
from common.djangoapps.course_modes.models import CourseMode
from openedx.core.djangoapps.course_groups.models import CourseCohort
from common.djangoapps.student.tests.factories import UserFactory
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
@@ -118,7 +118,7 @@ class CourseEmailTest(ModuleStoreTestCase):
mode_display_name = free_mode.capitalize
course_id = course.id
sender = UserFactory.create()
to_option = 'track:{}'.format(free_mode)
to_option = f'track:{free_mode}'
subject = "dummy subject"
html_message = "<html>dummy message</html>"
CourseMode.objects.create(
@@ -131,8 +131,8 @@ class CourseEmailTest(ModuleStoreTestCase):
assert len(email.targets.all()) == 1
target = email.targets.all()[0]
assert target.target_type == SEND_TO_TRACK
assert target.short_display() == 'track-{}'.format(free_mode)
assert target.long_display() == u'Course mode: {}'.format(mode_display_name)
assert target.short_display() == f'track-{free_mode}'
assert target.long_display() == f'Course mode: {mode_display_name}'
def test_cohort_target(self):
course_id = CourseKey.from_string('abc/123/doremi')
@@ -176,7 +176,7 @@ class CourseEmailTemplateTest(TestCase):
"""Test the CourseEmailTemplate model."""
def setUp(self):
super(CourseEmailTemplateTest, self).setUp() # lint-amnesty, pylint: disable=super-with-arguments
super().setUp()
# load initial content (since we don't run migrations as part of tests):
call_command("loaddata", "course_email_template.json")
@@ -219,8 +219,8 @@ class CourseEmailTemplateTest(TestCase):
template = CourseEmailTemplate.get_template(name="branded.template")
assert template.html_template is not None
assert template.plain_template is not None
assert u'THIS IS A BRANDED HTML TEMPLATE' in template.html_template
assert u'THIS IS A BRANDED TEXT TEMPLATE' in template.plain_template
assert 'THIS IS A BRANDED HTML TEMPLATE' in template.html_template
assert 'THIS IS A BRANDED TEXT TEMPLATE' in template.plain_template
def test_render_html_without_context(self):
template = CourseEmailTemplate.get_template()
@@ -249,7 +249,7 @@ class CourseEmailTemplateTest(TestCase):
template = CourseEmailTemplate.get_template()
context = self._add_xss_fields(self._get_sample_html_context())
message = template.render_htmltext(
u"Dear %%USER_FULLNAME%%, thanks for enrolling in %%COURSE_DISPLAY_NAME%%.", context
"Dear %%USER_FULLNAME%%, thanks for enrolling in %%COURSE_DISPLAY_NAME%%.", context
)
assert '<script>' not in message
assert '&lt;script&gt;alert(&#39;Course Title!&#39;);&lt;/alert&gt;' in message
@@ -264,7 +264,7 @@ class CourseEmailTemplateTest(TestCase):
template = CourseEmailTemplate.get_template()
context = self._add_xss_fields(self._get_sample_plain_context())
message = template.render_plaintext(
u"Dear %%USER_FULLNAME%%, thanks for enrolling in %%COURSE_DISPLAY_NAME%%.", context
"Dear %%USER_FULLNAME%%, thanks for enrolling in %%COURSE_DISPLAY_NAME%%.", context
)
assert '&lt;script&gt;' not in message
assert context['course_title'] in message
@@ -275,7 +275,7 @@ class CourseAuthorizationTest(TestCase):
"""Test the CourseAuthorization model."""
def tearDown(self):
super(CourseAuthorizationTest, self).tearDown() # lint-amnesty, pylint: disable=super-with-arguments
super().tearDown()
BulkEmailFlag.objects.all().delete()
def test_creation_auth_on(self):

View File

@@ -4,16 +4,15 @@ Unit tests for student optouts from course email
import json
from unittest.mock import Mock, patch
from django.core import mail
from django.core.management import call_command
from django.urls import reverse
from mock import Mock, patch
from six import text_type
from common.djangoapps.student.tests.factories import AdminFactory, CourseEnrollmentFactory, UserFactory
from lms.djangoapps.bulk_email.models import BulkEmailFlag, Optout
from lms.djangoapps.bulk_email.signals import force_optout_all
from common.djangoapps.student.tests.factories import AdminFactory, CourseEnrollmentFactory, UserFactory
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
@@ -25,7 +24,7 @@ class TestOptoutCourseEmailsBySignal(ModuleStoreTestCase):
"""
def setUp(self):
super(TestOptoutCourseEmailsBySignal, self).setUp() # lint-amnesty, pylint: disable=super-with-arguments
super().setUp()
self.course = CourseFactory.create(run='testcourse1', display_name="Test Course Title")
self.instructor = AdminFactory.create()
self.student = UserFactory.create()
@@ -36,9 +35,9 @@ class TestOptoutCourseEmailsBySignal(ModuleStoreTestCase):
self.client.login(username=self.student.username, password="test")
self.send_mail_url = reverse('send_email', kwargs={'course_id': text_type(self.course.id)})
self.send_mail_url = reverse('send_email', kwargs={'course_id': str(self.course.id)})
self.success_content = {
'course_id': text_type(self.course.id),
'course_id': str(self.course.id),
'success': True,
}
BulkEmailFlag.objects.create(enabled=True, require_course_email_auth=False)
@@ -55,7 +54,7 @@ class TestOptoutCourseEmailsBySignal(ModuleStoreTestCase):
Navigate to the instructor dash's email view to send bulk email
"""
# Pull up email view on instructor dashboard
url = reverse('instructor_dashboard', kwargs={'course_id': text_type(self.course.id)})
url = reverse('instructor_dashboard', kwargs={'course_id': str(self.course.id)})
response = self.client.get(url)
email_section = '<div class="vert-left send-email" id="section-send-email">'

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
Unit tests for LMS instructor-initiated background tasks.
@@ -11,6 +10,7 @@ paths actually work.
import json
from itertools import chain, cycle, repeat
from smtplib import SMTPAuthenticationError, SMTPConnectError, SMTPDataError, SMTPServerDisconnected
from unittest.mock import Mock, patch
from uuid import uuid4
import pytest
from boto.exception import AWSConnectionError
@@ -28,18 +28,17 @@ from boto.ses.exceptions import (
from celery.states import FAILURE, SUCCESS
from django.conf import settings
from django.core.management import call_command
from mock import Mock, patch
from opaque_keys.edx.locator import CourseLocator
from six.moves import range
from lms.djangoapps.bulk_email.tasks import _get_course_email_context
from lms.djangoapps.instructor_task.models import InstructorTask
from lms.djangoapps.instructor_task.subtasks import SubtaskStatus, update_subtask_status
from lms.djangoapps.instructor_task.tasks import send_bulk_course_email
from lms.djangoapps.instructor_task.tests.factories import InstructorTaskFactory
from lms.djangoapps.instructor_task.tests.test_base import InstructorTaskCourseTestCase
from xmodule.modulestore.tests.factories import CourseFactory
from ..models import SEND_TO_LEARNERS, SEND_TO_MYSELF, SEND_TO_STAFF, CourseEmail, Optout
from lms.djangoapps.bulk_email.tasks import _get_course_email_context # lint-amnesty, pylint: disable=wrong-import-order
from lms.djangoapps.instructor_task.models import InstructorTask # lint-amnesty, pylint: disable=wrong-import-order
from lms.djangoapps.instructor_task.subtasks import SubtaskStatus, update_subtask_status # lint-amnesty, pylint: disable=wrong-import-order
from lms.djangoapps.instructor_task.tasks import send_bulk_course_email # lint-amnesty, pylint: disable=wrong-import-order
from lms.djangoapps.instructor_task.tests.factories import InstructorTaskFactory # lint-amnesty, pylint: disable=wrong-import-order
from lms.djangoapps.instructor_task.tests.test_base import InstructorTaskCourseTestCase # lint-amnesty, pylint: disable=wrong-import-order
from xmodule.modulestore.tests.factories import CourseFactory # lint-amnesty, pylint: disable=wrong-import-order
class TestTaskFailure(Exception):
@@ -80,7 +79,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
"""Tests instructor task that send bulk email."""
def setUp(self):
super(TestBulkEmailInstructorTask, self).setUp() # lint-amnesty, pylint: disable=super-with-arguments
super().setUp()
self.initialize_course()
self.instructor = self.create_instructor('instructor')
@@ -154,7 +153,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
assert len(task_id_list) == 1
task_id = task_id_list[0]
subtask_status = subtask_status_info.get(task_id)
print(u"Testing subtask status: {}".format(subtask_status))
print(f"Testing subtask status: {subtask_status}")
assert subtask_status.get('task_id') == task_id
assert subtask_status.get('attempted') == (succeeded + failed)
assert subtask_status.get('succeeded') == succeeded
@@ -291,7 +290,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
students = [self.create_student('robot%d' % i) for i in range(num_emails)]
for student in students[:emails_with_non_ascii_chars]:
student.email = '{username}@tesá.com'.format(username=student.username)
student.email = f'{student.username}@tesá.com'
student.save()
total = num_emails + num_of_course_instructors
@@ -451,7 +450,7 @@ class TestBulkEmailInstructorTask(InstructorTaskCourseTestCase):
def test_bulk_emails_with_unicode_course_image_name(self):
# Test bulk email with unicode characters in course image name
course_image = u'在淡水測試.jpg'
course_image = '在淡水測試.jpg'
self.course = CourseFactory.create(course_image=course_image)
num_emails = 2

View File

@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""
Test the bulk email opt out view.
"""
@@ -10,15 +9,13 @@ from django.test.client import RequestFactory
from django.test.utils import override_settings
from django.urls import reverse
from common.djangoapps.student.tests.factories import UserFactory
from lms.djangoapps.bulk_email.models import Optout
from lms.djangoapps.bulk_email.views import opt_out_email_updates
from six import text_type # lint-amnesty, pylint: disable=wrong-import-order
from lms.djangoapps.discussion.notification_prefs.views import UsernameCipher
from openedx.core.lib.tests import attr
from common.djangoapps.student.tests.factories import UserFactory
from xmodule.modulestore.tests.factories import CourseFactory
from xmodule.modulestore.tests.django_utils import ModuleStoreTestCase
from xmodule.modulestore.tests.factories import CourseFactory
@attr(shard=1)
@@ -29,12 +26,12 @@ class OptOutEmailUpdatesViewTest(ModuleStoreTestCase):
Check the opt out email functionality.
"""
def setUp(self):
super(OptOutEmailUpdatesViewTest, self).setUp() # lint-amnesty, pylint: disable=super-with-arguments
super().setUp()
self.user = UserFactory.create(username="testuser1", email='test@example.com')
self.course = CourseFactory.create(run='testcourse1', display_name='Test Course Title')
self.token = UsernameCipher.encrypt('testuser1')
self.request_factory = RequestFactory()
self.url = reverse('bulk_email_opt_out', args=[self.token, text_type(self.course.id)])
self.url = reverse('bulk_email_opt_out', args=[self.token, str(self.course.id)])
# Ensure we start with no opt-out records
assert Optout.objects.count() == 0

View File

@@ -2,8 +2,8 @@
URLs for bulk_email app
"""
from django.conf.urls import url
from django.conf import settings
from django.conf.urls import url
from . import views

View File

@@ -5,22 +5,15 @@ Views to support bulk email functionalities like opt-out.
import logging
from six import text_type
from django.contrib.auth.models import User # lint-amnesty, pylint: disable=imported-auth-user
from django.http import Http404
from opaque_keys import InvalidKeyError
from opaque_keys.edx.keys import CourseKey
from common.djangoapps.edxmako.shortcuts import render_to_response
from lms.djangoapps.bulk_email.models import Optout
from lms.djangoapps.courseware.courses import get_course_by_id
from common.djangoapps.edxmako.shortcuts import render_to_response
from lms.djangoapps.discussion.notification_prefs.views import (
UsernameCipher,
UsernameDecryptionException,
)
from opaque_keys import InvalidKeyError # lint-amnesty, pylint: disable=wrong-import-order
from opaque_keys.edx.keys import CourseKey # lint-amnesty, pylint: disable=wrong-import-order
from lms.djangoapps.discussion.notification_prefs.views import UsernameCipher, UsernameDecryptionException
log = logging.getLogger(__name__)
@@ -43,7 +36,7 @@ def opt_out_email_updates(request, token, course_id):
except UnicodeDecodeError:
raise Http404("base64url") # lint-amnesty, pylint: disable=raise-missing-from
except UsernameDecryptionException as exn:
raise Http404(text_type(exn)) # lint-amnesty, pylint: disable=raise-missing-from
raise Http404(str(exn)) # lint-amnesty, pylint: disable=raise-missing-from
except User.DoesNotExist:
raise Http404("username") # lint-amnesty, pylint: disable=raise-missing-from
except InvalidKeyError:
@@ -61,7 +54,7 @@ def opt_out_email_updates(request, token, course_id):
if request.method == 'POST' and unsub_check:
Optout.objects.get_or_create(user=user, course_id=course_key)
log.info(
u"User %s (%s) opted out of receiving emails from course %s",
"User %s (%s) opted out of receiving emails from course %s",
user.username,
user.email,
course_id,