Merge pull request #5528 from edx/dan-f/data-download-tech-debt
Student profile CSV background task tech debt
This commit is contained in:
@@ -146,3 +146,19 @@ class MembershipPage(PageObject):
|
||||
The first entry in the array is the title. Any further entries are the details.
|
||||
"""
|
||||
return self._get_cohort_messages("errors")
|
||||
|
||||
def select_data_download(self):
|
||||
"""
|
||||
Click on the link to the Data Download Page.
|
||||
"""
|
||||
self.q(css="a.link-cross-reference[data-section=data_download]").first.click()
|
||||
|
||||
|
||||
class DataDownloadPage(PageObject):
|
||||
"""
|
||||
Data Download section of the Instructor dashboard.
|
||||
"""
|
||||
url = None
|
||||
|
||||
def is_browser_on_page(self):
|
||||
return self.q(css='a[data-section=data_download].active-section').present
|
||||
|
||||
@@ -12,7 +12,7 @@ from .helpers import CohortTestMixin
|
||||
from ..helpers import UniqueCourseTest
|
||||
from ...fixtures.course import CourseFixture
|
||||
from ...pages.lms.auto_auth import AutoAuthPage
|
||||
from ...pages.lms.instructor_dashboard import InstructorDashboardPage
|
||||
from ...pages.lms.instructor_dashboard import InstructorDashboardPage, DataDownloadPage
|
||||
from ...pages.studio.settings_advanced import AdvancedSettingsPage
|
||||
|
||||
import uuid
|
||||
@@ -242,3 +242,16 @@ class CohortConfigurationTest(UniqueCourseTest, CohortTestMixin):
|
||||
}).count(),
|
||||
1
|
||||
)
|
||||
|
||||
def test_link_to_data_download(self):
|
||||
"""
|
||||
Scenario: a link is present from the cohort configuration in
|
||||
the instructor dashboard to the Data Download section.
|
||||
|
||||
Given I have a course with a cohort defined
|
||||
When I view the cohort in the LMS instructor dashboard
|
||||
There is a link to take me to the Data Download section of the Instructor Dashboard.
|
||||
"""
|
||||
self.membership_page.select_data_download()
|
||||
data_download_page = DataDownloadPage(self.browser)
|
||||
data_download_page.wait_for_page()
|
||||
|
||||
@@ -30,8 +30,8 @@ from instructor_task.tasks_helper import (
|
||||
rescore_problem_module_state,
|
||||
reset_attempts_module_state,
|
||||
delete_problem_module_state,
|
||||
push_grades_to_s3,
|
||||
push_students_csv_to_s3
|
||||
upload_grades_csv,
|
||||
upload_students_csv
|
||||
)
|
||||
from bulk_email.tasks import perform_delegate_email_batches
|
||||
|
||||
@@ -139,7 +139,7 @@ def calculate_grades_csv(entry_id, xmodule_instance_args):
|
||||
"""
|
||||
# Translators: This is a past-tense verb that is inserted into task progress messages as {action}.
|
||||
action_name = ugettext_noop('graded')
|
||||
task_fn = partial(push_grades_to_s3, xmodule_instance_args)
|
||||
task_fn = partial(upload_grades_csv, xmodule_instance_args)
|
||||
return run_main_task(entry_id, task_fn, action_name)
|
||||
|
||||
|
||||
@@ -151,5 +151,5 @@ def calculate_students_features_csv(entry_id, xmodule_instance_args):
|
||||
"""
|
||||
# Translators: This is a past-tense verb that is inserted into task progress messages as {action}.
|
||||
action_name = ugettext_noop('generated')
|
||||
task_fn = partial(push_students_csv_to_s3, xmodule_instance_args)
|
||||
task_fn = partial(upload_students_csv, xmodule_instance_args)
|
||||
return run_main_task(entry_id, task_fn, action_name)
|
||||
|
||||
@@ -148,6 +148,49 @@ def _get_current_task():
|
||||
return current_task
|
||||
|
||||
|
||||
class TaskProgress(object):
|
||||
"""
|
||||
Encapsulates the current task's progress by keeping track of
|
||||
'attempted', 'succeeded', 'skipped', 'failed', 'total',
|
||||
'action_name', and 'duration_ms' values.
|
||||
"""
|
||||
def __init__(self, action_name, total, start_time):
|
||||
self.action_name = action_name
|
||||
self.total = total
|
||||
self.start_time = start_time
|
||||
self.attempted = 0
|
||||
self.succeeded = 0
|
||||
self.skipped = 0
|
||||
self.failed = 0
|
||||
|
||||
def update_task_state(self, extra_meta=None):
|
||||
"""
|
||||
Update the current celery task's state to the progress state
|
||||
specified by the current object. Returns the progress
|
||||
dictionary for use by `run_main_task` and
|
||||
`BaseInstructorTask.on_success`.
|
||||
|
||||
Arguments:
|
||||
extra_meta (dict): Extra metadata to pass to `update_state`
|
||||
|
||||
Returns:
|
||||
dict: The current task's progress dict
|
||||
"""
|
||||
progress_dict = {
|
||||
'action_name': self.action_name,
|
||||
'attempted': self.attempted,
|
||||
'succeeded': self.succeeded,
|
||||
'skipped': self.skipped,
|
||||
'failed': self.failed,
|
||||
'total': self.total,
|
||||
'duration_ms': int((time() - self.start_time) * 1000),
|
||||
}
|
||||
if extra_meta is not None:
|
||||
progress_dict.update(extra_meta)
|
||||
_get_current_task().update_state(state=PROGRESS, meta=progress_dict)
|
||||
return progress_dict
|
||||
|
||||
|
||||
def run_main_task(entry_id, task_fcn, action_name):
|
||||
"""
|
||||
Applies the `task_fcn` to the arguments defined in `entry_id` InstructorTask.
|
||||
@@ -243,9 +286,7 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta
|
||||
result object.
|
||||
|
||||
"""
|
||||
# get start time for task:
|
||||
start_time = time()
|
||||
|
||||
usage_key = course_id.make_usage_key_from_deprecated_string(task_input.get('problem_url'))
|
||||
student_identifier = task_input.get('student')
|
||||
|
||||
@@ -272,30 +313,11 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta
|
||||
if filter_fcn is not None:
|
||||
modules_to_update = filter_fcn(modules_to_update)
|
||||
|
||||
# perform the main loop
|
||||
num_attempted = 0
|
||||
num_succeeded = 0
|
||||
num_skipped = 0
|
||||
num_failed = 0
|
||||
num_total = modules_to_update.count()
|
||||
task_progress = TaskProgress(action_name, modules_to_update.count(), start_time)
|
||||
task_progress.update_task_state()
|
||||
|
||||
def get_task_progress():
|
||||
"""Return a dict containing info about current task"""
|
||||
current_time = time()
|
||||
progress = {'action_name': action_name,
|
||||
'attempted': num_attempted,
|
||||
'succeeded': num_succeeded,
|
||||
'skipped': num_skipped,
|
||||
'failed': num_failed,
|
||||
'total': num_total,
|
||||
'duration_ms': int((current_time - start_time) * 1000),
|
||||
}
|
||||
return progress
|
||||
|
||||
task_progress = get_task_progress()
|
||||
_get_current_task().update_state(state=PROGRESS, meta=task_progress)
|
||||
for module_to_update in modules_to_update:
|
||||
num_attempted += 1
|
||||
task_progress.attempted += 1
|
||||
# There is no try here: if there's an error, we let it throw, and the task will
|
||||
# be marked as FAILED, with a stack trace.
|
||||
with dog_stats_api.timer('instructor_tasks.module.time.step', tags=[u'action:{name}'.format(name=action_name)]):
|
||||
@@ -303,19 +325,15 @@ def perform_module_state_update(update_fcn, filter_fcn, _entry_id, course_id, ta
|
||||
if update_status == UPDATE_STATUS_SUCCEEDED:
|
||||
# If the update_fcn returns true, then it performed some kind of work.
|
||||
# Logging of failures is left to the update_fcn itself.
|
||||
num_succeeded += 1
|
||||
task_progress.succeeded += 1
|
||||
elif update_status == UPDATE_STATUS_FAILED:
|
||||
num_failed += 1
|
||||
task_progress.failed += 1
|
||||
elif update_status == UPDATE_STATUS_SKIPPED:
|
||||
num_skipped += 1
|
||||
task_progress.skipped += 1
|
||||
else:
|
||||
raise UpdateProblemModuleStateError("Unexpected update_status returned: {}".format(update_status))
|
||||
|
||||
# update task status:
|
||||
task_progress = get_task_progress()
|
||||
_get_current_task().update_state(state=PROGRESS, meta=task_progress)
|
||||
|
||||
return task_progress
|
||||
return task_progress.update_task_state()
|
||||
|
||||
|
||||
def _get_task_id_from_xmodule_args(xmodule_instance_args):
|
||||
@@ -505,7 +523,7 @@ def upload_csv_to_report_store(rows, csv_name, course_id, timestamp):
|
||||
)
|
||||
|
||||
|
||||
def push_grades_to_s3(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
|
||||
def upload_grades_csv(_xmodule_instance_args, _entry_id, course_id, _task_input, action_name):
|
||||
"""
|
||||
For a given `course_id`, generate a grades CSV file for all students that
|
||||
are enrolled, and store using a `ReportStore`. Once created, the files can
|
||||
@@ -518,45 +536,26 @@ def push_grades_to_s3(_xmodule_instance_args, _entry_id, course_id, _task_input,
|
||||
make a more general CSVDoc class instead of building out the rows like we
|
||||
do here.
|
||||
"""
|
||||
start_time = datetime.now(UTC)
|
||||
start_time = time()
|
||||
start_date = datetime.now(UTC)
|
||||
status_interval = 100
|
||||
|
||||
enrolled_students = CourseEnrollment.users_enrolled_in(course_id)
|
||||
num_total = enrolled_students.count()
|
||||
num_attempted = 0
|
||||
num_succeeded = 0
|
||||
num_failed = 0
|
||||
curr_step = "Calculating Grades"
|
||||
|
||||
def update_task_progress():
|
||||
"""Return a dict containing info about current task"""
|
||||
current_time = datetime.now(UTC)
|
||||
progress = {
|
||||
'action_name': action_name,
|
||||
'attempted': num_attempted,
|
||||
'succeeded': num_succeeded,
|
||||
'failed': num_failed,
|
||||
'total': num_total,
|
||||
'duration_ms': int((current_time - start_time).total_seconds() * 1000),
|
||||
'step': curr_step,
|
||||
}
|
||||
_get_current_task().update_state(state=PROGRESS, meta=progress)
|
||||
|
||||
return progress
|
||||
task_progress = TaskProgress(action_name, enrolled_students.count(), start_time)
|
||||
|
||||
# Loop over all our students and build our CSV lists in memory
|
||||
header = None
|
||||
rows = []
|
||||
err_rows = [["id", "username", "error_msg"]]
|
||||
current_step = {'step': 'Calculating Grades'}
|
||||
for student, gradeset, err_msg in iterate_grades_for(course_id, enrolled_students):
|
||||
# Periodically update task status (this is a cache write)
|
||||
if num_attempted % status_interval == 0:
|
||||
update_task_progress()
|
||||
num_attempted += 1
|
||||
if task_progress.attempted % status_interval == 0:
|
||||
task_progress.update_task_state(extra_meta=current_step)
|
||||
task_progress.attempted += 1
|
||||
|
||||
if gradeset:
|
||||
# We were able to successfully grade this student for this course.
|
||||
num_succeeded += 1
|
||||
task_progress.succeeded += 1
|
||||
if not header:
|
||||
# Encode the header row in utf-8 encoding in case there are unicode characters
|
||||
header = [section['label'].encode('utf-8') for section in gradeset[u'section_breakdown']]
|
||||
@@ -578,37 +577,50 @@ def push_grades_to_s3(_xmodule_instance_args, _entry_id, course_id, _task_input,
|
||||
rows.append([student.id, student.email, student.username, gradeset['percent']] + row_percents)
|
||||
else:
|
||||
# An empty gradeset means we failed to grade a student.
|
||||
num_failed += 1
|
||||
task_progress.failed += 1
|
||||
err_rows.append([student.id, student.username, err_msg])
|
||||
|
||||
# By this point, we've got the rows we're going to stuff into our CSV files.
|
||||
curr_step = "Uploading CSVs"
|
||||
update_task_progress()
|
||||
current_step = {'step': 'Uploading CSVs'}
|
||||
task_progress.update_task_state(extra_meta=current_step)
|
||||
|
||||
# Perform the actual upload
|
||||
upload_csv_to_report_store(rows, 'grade_report', course_id, start_time)
|
||||
upload_csv_to_report_store(rows, 'grade_report', course_id, start_date)
|
||||
|
||||
# If there are any error rows (don't count the header), write them out as well
|
||||
if len(err_rows) > 1:
|
||||
upload_csv_to_report_store(err_rows, 'grade_report_err', course_id, start_time)
|
||||
upload_csv_to_report_store(err_rows, 'grade_report_err', course_id, start_date)
|
||||
|
||||
# One last update before we close out...
|
||||
return update_task_progress()
|
||||
return task_progress.update_task_state(extra_meta=current_step)
|
||||
|
||||
|
||||
def push_students_csv_to_s3(_xmodule_instance_args, _entry_id, course_id, task_input, _action_name):
|
||||
def upload_students_csv(_xmodule_instance_args, _entry_id, course_id, task_input, action_name):
|
||||
"""
|
||||
For a given `course_id`, generate a CSV file containing profile
|
||||
information for all students that are enrolled, and store using a
|
||||
`ReportStore`.
|
||||
"""
|
||||
start_time = time()
|
||||
start_date = datetime.now(UTC)
|
||||
task_progress = TaskProgress(action_name, CourseEnrollment.num_enrolled_in(course_id), start_time)
|
||||
current_step = {'step': 'Calculating Profile Info'}
|
||||
task_progress.update_task_state(extra_meta=current_step)
|
||||
|
||||
# compute the student features table and format it
|
||||
query_features = task_input.get('features')
|
||||
student_data = enrolled_students_features(course_id, query_features)
|
||||
header, rows = format_dictlist(student_data, query_features)
|
||||
|
||||
task_progress.attempted = task_progress.succeeded = len(rows)
|
||||
task_progress.skipped = task_progress.total - task_progress.attempted
|
||||
|
||||
rows.insert(0, header)
|
||||
|
||||
# Perform the upload
|
||||
upload_csv_to_report_store(rows, 'student_profile_info', course_id, datetime.now(UTC))
|
||||
current_step = {'step': 'Uploading CSV'}
|
||||
task_progress.update_task_state(extra_meta=current_step)
|
||||
|
||||
return UPDATE_STATUS_SUCCEEDED
|
||||
# Perform the upload
|
||||
upload_csv_to_report_store(rows, 'student_profile_info', course_id, start_date)
|
||||
|
||||
return task_progress.update_task_state(extra_meta=current_step)
|
||||
|
||||
@@ -19,7 +19,7 @@ from xmodule.modulestore.tests.factories import CourseFactory
|
||||
from student.tests.factories import CourseEnrollmentFactory, UserFactory
|
||||
|
||||
from instructor_task.models import ReportStore
|
||||
from instructor_task.tasks_helper import push_grades_to_s3, push_students_csv_to_s3, UPDATE_STATUS_SUCCEEDED
|
||||
from instructor_task.tasks_helper import upload_grades_csv, upload_students_csv
|
||||
|
||||
|
||||
class TestReport(ModuleStoreTestCase):
|
||||
@@ -36,6 +36,7 @@ class TestReport(ModuleStoreTestCase):
|
||||
def create_student(self, username, email):
|
||||
student = UserFactory.create(username=username, email=email)
|
||||
CourseEnrollmentFactory.create(user=student, course_id=self.course.id)
|
||||
return student
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@@ -55,9 +56,26 @@ class TestInstructorGradeReport(TestReport):
|
||||
self.current_task.update_state = Mock()
|
||||
with patch('instructor_task.tasks_helper._get_current_task') as mock_current_task:
|
||||
mock_current_task.return_value = self.current_task
|
||||
result = push_grades_to_s3(None, None, self.course.id, None, 'graded')
|
||||
#This assertion simply confirms that the generation completed with no errors
|
||||
self.assertEquals(result['succeeded'], result['attempted'])
|
||||
result = upload_grades_csv(None, None, self.course.id, None, 'graded')
|
||||
num_students = len(emails)
|
||||
self.assertDictContainsSubset({'attempted': num_students, 'succeeded': num_students, 'failed': 0}, result)
|
||||
|
||||
@patch('instructor_task.tasks_helper._get_current_task')
|
||||
@patch('instructor_task.tasks_helper.iterate_grades_for')
|
||||
def test_grading_failure(self, mock_iterate_grades_for, _mock_current_task):
|
||||
"""
|
||||
Test that any grading errors are properly reported in the
|
||||
progress dict and uploaded to the report store.
|
||||
"""
|
||||
# mock an error response from `iterate_grades_for`
|
||||
mock_iterate_grades_for.return_value = [
|
||||
(self.create_student('username', 'student@example.com'), {}, 'Cannot grade student')
|
||||
]
|
||||
result = upload_grades_csv(None, None, self.course.id, None, 'graded')
|
||||
self.assertDictContainsSubset({'attempted': 1, 'succeeded': 0, 'failed': 1}, result)
|
||||
|
||||
report_store = ReportStore.from_config()
|
||||
self.assertTrue(any('grade_report_err' in item[0] for item in report_store.links_for(self.course.id)))
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@@ -66,14 +84,15 @@ class TestStudentReport(TestReport):
|
||||
Tests that CSV student profile report generation works.
|
||||
"""
|
||||
def test_success(self):
|
||||
self.create_student('student', 'student@example.com')
|
||||
task_input = {'features': []}
|
||||
with patch('instructor_task.tasks_helper._get_current_task'):
|
||||
result = push_students_csv_to_s3(None, None, self.course.id, task_input, 'calculated')
|
||||
result = upload_students_csv(None, None, self.course.id, task_input, 'calculated')
|
||||
report_store = ReportStore.from_config()
|
||||
links = report_store.links_for(self.course.id)
|
||||
|
||||
self.assertEquals(len(links), 1)
|
||||
self.assertEquals(result, UPDATE_STATUS_SUCCEEDED)
|
||||
self.assertDictContainsSubset({'attempted': 1, 'succeeded': 1, 'failed': 0}, result)
|
||||
|
||||
@ddt.data([u'student', u'student\xec'])
|
||||
def test_unicode_usernames(self, students):
|
||||
@@ -95,6 +114,7 @@ class TestStudentReport(TestReport):
|
||||
}
|
||||
with patch('instructor_task.tasks_helper._get_current_task') as mock_current_task:
|
||||
mock_current_task.return_value = self.current_task
|
||||
result = push_students_csv_to_s3(None, None, self.course.id, task_input, 'calculated')
|
||||
result = upload_students_csv(None, None, self.course.id, task_input, 'calculated')
|
||||
#This assertion simply confirms that the generation completed with no errors
|
||||
self.assertEquals(result, UPDATE_STATUS_SUCCEEDED)
|
||||
num_students = len(students)
|
||||
self.assertDictContainsSubset({'attempted': num_students, 'succeeded': num_students, 'failed': 0}, result)
|
||||
|
||||
@@ -153,7 +153,7 @@ class ReportDownloads
|
||||
|
||||
@$report_downloads_table = @$section.find ".report-downloads-table"
|
||||
|
||||
POLL_INTERVAL = 1000 * 60 * 5 # 5 minutes in ms
|
||||
POLL_INTERVAL = 20000 # 20 seconds, just like the "pending instructor tasks" table
|
||||
@downloads_poller = new window.InstructorDashboard.util.IntervalManager(
|
||||
POLL_INTERVAL, => @reload_report_downloads()
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user