Merge pull request #14370 from edx/sstudent/grades-tasks-logging
add task id to logging and log database exceptions for TNL-6332
This commit is contained in:
@@ -3,6 +3,7 @@ This module contains tasks for asynchronous execution of grade updates.
|
||||
"""
|
||||
|
||||
from celery import task
|
||||
from celery.exceptions import Retry
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import User
|
||||
from django.core.exceptions import ValidationError
|
||||
@@ -34,25 +35,16 @@ log = getLogger(__name__)
|
||||
KNOWN_RETRY_ERRORS = (DatabaseError, ValidationError) # Errors we expect occasionally, should be resolved on retry
|
||||
|
||||
|
||||
# TODO (TNL-6373) DELETE ME once v3 is successfully deployed to Prod.
|
||||
@task(base=PersistOnFailureTask, default_retry_delay=30, routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY)
|
||||
def recalculate_subsection_grade_v2(**kwargs):
|
||||
"""
|
||||
Shim to support tasks enqueued by older workers during initial deployment.
|
||||
"""
|
||||
_recalculate_subsection_grade(recalculate_subsection_grade_v2, **kwargs)
|
||||
|
||||
|
||||
@task(base=PersistOnFailureTask, default_retry_delay=30, routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY)
|
||||
def recalculate_subsection_grade_v3(**kwargs):
|
||||
@task(bind=True, base=PersistOnFailureTask, default_retry_delay=30, routing_key=settings.RECALCULATE_GRADES_ROUTING_KEY)
|
||||
def recalculate_subsection_grade_v3(self, **kwargs):
|
||||
"""
|
||||
Latest version of the recalculate_subsection_grade task. See docstring
|
||||
for _recalculate_subsection_grade for further description.
|
||||
"""
|
||||
_recalculate_subsection_grade(recalculate_subsection_grade_v3, **kwargs)
|
||||
_recalculate_subsection_grade(self, **kwargs)
|
||||
|
||||
|
||||
def _recalculate_subsection_grade(task_func, **kwargs):
|
||||
def _recalculate_subsection_grade(self, **kwargs):
|
||||
"""
|
||||
Updates a saved subsection grade.
|
||||
|
||||
@@ -94,18 +86,10 @@ def _recalculate_subsection_grade(task_func, **kwargs):
|
||||
# created. This race condition occurs if the transaction in the task
|
||||
# creator's process hasn't committed before the task initiates in the worker
|
||||
# process.
|
||||
if task_func == recalculate_subsection_grade_v2:
|
||||
has_database_updated = _has_db_updated_with_new_score_bwc_v2(
|
||||
kwargs['user_id'],
|
||||
scored_block_usage_key,
|
||||
from_timestamp(kwargs['expected_modified_time']),
|
||||
kwargs['score_deleted'],
|
||||
)
|
||||
else:
|
||||
has_database_updated = _has_db_updated_with_new_score(scored_block_usage_key, **kwargs)
|
||||
has_database_updated = _has_db_updated_with_new_score(self, scored_block_usage_key, **kwargs)
|
||||
|
||||
if not has_database_updated:
|
||||
raise _retry_recalculate_subsection_grade(task_func, **kwargs)
|
||||
raise _retry_recalculate_subsection_grade(self, **kwargs)
|
||||
|
||||
_update_subsection_grades(
|
||||
course_key,
|
||||
@@ -113,17 +97,19 @@ def _recalculate_subsection_grade(task_func, **kwargs):
|
||||
kwargs['only_if_higher'],
|
||||
kwargs['user_id'],
|
||||
)
|
||||
|
||||
except Retry:
|
||||
raise
|
||||
except Exception as exc: # pylint: disable=broad-except
|
||||
if not isinstance(exc, KNOWN_RETRY_ERRORS):
|
||||
log.info("tnl-6244 grades unexpected failure: {}. kwargs={}".format(
|
||||
log.info("tnl-6244 grades unexpected failure: {}. task id: {}. kwargs={}".format(
|
||||
repr(exc),
|
||||
kwargs
|
||||
self.request.id,
|
||||
kwargs,
|
||||
))
|
||||
raise _retry_recalculate_subsection_grade(task_func, exc=exc, **kwargs)
|
||||
raise _retry_recalculate_subsection_grade(self, exc=exc, **kwargs)
|
||||
|
||||
|
||||
def _has_db_updated_with_new_score(scored_block_usage_key, **kwargs):
|
||||
def _has_db_updated_with_new_score(self, scored_block_usage_key, **kwargs):
|
||||
"""
|
||||
Returns whether the database has been updated with the
|
||||
expected new score values for the given problem and user.
|
||||
@@ -147,48 +133,21 @@ def _has_db_updated_with_new_score(scored_block_usage_key, **kwargs):
|
||||
if score is None:
|
||||
# score should be None only if it was deleted.
|
||||
# Otherwise, it hasn't yet been saved.
|
||||
return kwargs['score_deleted']
|
||||
|
||||
return found_modified_time >= from_timestamp(kwargs['expected_modified_time'])
|
||||
|
||||
|
||||
# TODO (TNL-6373) DELETE ME once v3 is successfully deployed to Prod.
|
||||
def _has_db_updated_with_new_score_bwc_v2(
|
||||
user_id, scored_block_usage_key, expected_modified_time, score_deleted,
|
||||
):
|
||||
"""
|
||||
DEPRECATED version for backward compatibility with v2 tasks.
|
||||
|
||||
Returns whether the database has been updated with the
|
||||
expected new score values for the given problem and user.
|
||||
"""
|
||||
score = get_score(user_id, scored_block_usage_key)
|
||||
|
||||
if score is None:
|
||||
# score should be None only if it was deleted.
|
||||
# Otherwise, it hasn't yet been saved.
|
||||
return score_deleted
|
||||
elif score.module_type == 'openassessment':
|
||||
anon_id = anonymous_id_for_user(User.objects.get(id=user_id), scored_block_usage_key.course_key)
|
||||
course_id = unicode(scored_block_usage_key.course_key)
|
||||
item_id = unicode(scored_block_usage_key)
|
||||
|
||||
api_score = sub_api.get_score(
|
||||
{
|
||||
"student_id": anon_id,
|
||||
"course_id": course_id,
|
||||
"item_id": item_id,
|
||||
"item_type": "openassessment"
|
||||
}
|
||||
)
|
||||
if api_score is None:
|
||||
# Same case as the initial 'if' above, for submissions-specific scores
|
||||
return score_deleted
|
||||
reported_modified_time = api_score['created_at']
|
||||
db_is_updated = kwargs['score_deleted']
|
||||
else:
|
||||
reported_modified_time = score.modified
|
||||
db_is_updated = found_modified_time >= from_timestamp(kwargs['expected_modified_time'])
|
||||
|
||||
return reported_modified_time >= expected_modified_time
|
||||
if not db_is_updated:
|
||||
log.info(
|
||||
u"Persistent Grades: tasks._has_database_updated_with_new_score is False. Task ID: {}. Kwargs: {}. Found "
|
||||
u"modified time: {}".format(
|
||||
self.request.id,
|
||||
kwargs,
|
||||
found_modified_time,
|
||||
)
|
||||
)
|
||||
|
||||
return db_is_updated
|
||||
|
||||
|
||||
def _update_subsection_grades(
|
||||
@@ -231,9 +190,9 @@ def _update_subsection_grades(
|
||||
)
|
||||
|
||||
|
||||
def _retry_recalculate_subsection_grade(task_func, exc=None, **kwargs):
|
||||
def _retry_recalculate_subsection_grade(self, exc=None, **kwargs):
|
||||
"""
|
||||
Calls retry for the recalculate_subsection_grade task with the
|
||||
given inputs.
|
||||
"""
|
||||
task_func.retry(kwargs=kwargs, exc=exc)
|
||||
self.retry(kwargs=kwargs, exc=exc)
|
||||
|
||||
@@ -229,10 +229,10 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase):
|
||||
|
||||
@ddt.data(ScoreDatabaseTableEnum.courseware_student_module, ScoreDatabaseTableEnum.submissions)
|
||||
@patch('lms.djangoapps.grades.tasks.recalculate_subsection_grade_v3.retry')
|
||||
def test_retry_when_db_not_updated(self, score_db_table, mock_retry):
|
||||
@patch('lms.djangoapps.grades.tasks.log')
|
||||
def test_retry_when_db_not_updated(self, score_db_table, mock_log, mock_retry):
|
||||
self.set_up_course()
|
||||
self.recalculate_subsection_grade_kwargs['score_db_table'] = score_db_table
|
||||
|
||||
modified_datetime = datetime.utcnow().replace(tzinfo=pytz.UTC) - timedelta(days=1)
|
||||
if score_db_table == ScoreDatabaseTableEnum.submissions:
|
||||
with patch('lms.djangoapps.grades.tasks.sub_api.get_score') as mock_sub_score:
|
||||
@@ -248,6 +248,10 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase):
|
||||
)
|
||||
|
||||
self._assert_retry_called(mock_retry)
|
||||
self.assertIn(
|
||||
u"Persistent Grades: tasks._has_database_updated_with_new_score is False.",
|
||||
mock_log.info.call_args_list[0][0][0]
|
||||
)
|
||||
|
||||
@ddt.data(
|
||||
*itertools.product(
|
||||
@@ -257,7 +261,8 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase):
|
||||
)
|
||||
@ddt.unpack
|
||||
@patch('lms.djangoapps.grades.tasks.recalculate_subsection_grade_v3.retry')
|
||||
def test_when_no_score_found(self, score_deleted, score_db_table, mock_retry):
|
||||
@patch('lms.djangoapps.grades.tasks.log')
|
||||
def test_when_no_score_found(self, score_deleted, score_db_table, mock_log, mock_retry):
|
||||
self.set_up_course()
|
||||
self.recalculate_subsection_grade_kwargs['score_deleted'] = score_deleted
|
||||
self.recalculate_subsection_grade_kwargs['score_db_table'] = score_db_table
|
||||
@@ -275,6 +280,10 @@ class RecalculateSubsectionGradeTest(ModuleStoreTestCase):
|
||||
self._assert_retry_not_called(mock_retry)
|
||||
else:
|
||||
self._assert_retry_called(mock_retry)
|
||||
self.assertIn(
|
||||
u"Persistent Grades: tasks._has_database_updated_with_new_score is False.",
|
||||
mock_log.info.call_args_list[0][0][0]
|
||||
)
|
||||
|
||||
@patch('lms.djangoapps.grades.tasks.log')
|
||||
@patch('lms.djangoapps.grades.tasks.recalculate_subsection_grade_v3.retry')
|
||||
|
||||
Reference in New Issue
Block a user