Add cache-based locking to subtasks to ensure that the same task is not running in two workers at the same time.
This commit is contained in:
@@ -24,6 +24,7 @@ from instructor_task.models import InstructorTask
|
||||
from instructor_task.subtasks import (
|
||||
create_subtask_status,
|
||||
initialize_subtask_info,
|
||||
check_subtask_is_valid,
|
||||
update_subtask_status,
|
||||
DuplicateTaskException,
|
||||
)
|
||||
@@ -217,7 +218,7 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
subtask_id = "subtask-id-value"
|
||||
subtask_status = create_subtask_status(subtask_id)
|
||||
email_id = 1001
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find email subtasks of instructor task'):
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find subtasks of instructor task'):
|
||||
send_course_email(entry_id, email_id, to_list, global_email_context, subtask_status)
|
||||
|
||||
def test_send_email_missing_subtask(self):
|
||||
@@ -231,7 +232,7 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
different_subtask_id = "bogus-subtask-id-value"
|
||||
subtask_status = create_subtask_status(different_subtask_id)
|
||||
bogus_email_id = 1001
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find status for email subtask of instructor task'):
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'unable to find status for subtask of instructor task'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status)
|
||||
|
||||
def test_send_email_completed_subtask(self):
|
||||
@@ -249,6 +250,21 @@ class TestEmailErrors(ModuleStoreTestCase):
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'already completed'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, new_subtask_status)
|
||||
|
||||
def test_send_email_running_subtask(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
|
||||
entry_id = entry.id # pylint: disable=E1101
|
||||
subtask_id = "subtask-id-value"
|
||||
initialize_subtask_info(entry, "emailed", 100, [subtask_id])
|
||||
subtask_status = create_subtask_status(subtask_id)
|
||||
update_subtask_status(entry_id, subtask_id, subtask_status)
|
||||
check_subtask_is_valid(entry_id, subtask_id)
|
||||
bogus_email_id = 1001
|
||||
to_list = ['test@test.com']
|
||||
global_email_context = {'course_title': 'dummy course'}
|
||||
with self.assertRaisesRegexp(DuplicateTaskException, 'already being executed'):
|
||||
send_course_email(entry_id, bogus_email_id, to_list, global_email_context, subtask_status)
|
||||
|
||||
def dont_test_send_email_undefined_email(self):
|
||||
# test at a lower level, to ensure that the course gets checked down below too.
|
||||
entry = InstructorTask.create(self.course.id, "task_type", "task_key", "task_input", self.instructor)
|
||||
|
||||
@@ -8,11 +8,15 @@ from celery.utils.log import get_task_logger
|
||||
from celery.states import SUCCESS, READY_STATES
|
||||
|
||||
from django.db import transaction
|
||||
from django.core.cache import cache
|
||||
|
||||
from instructor_task.models import InstructorTask, PROGRESS, QUEUING
|
||||
|
||||
TASK_LOG = get_task_logger(__name__)
|
||||
|
||||
# Lock expiration should be long enough to allow a send_course_email task to complete.
|
||||
SUBTASK_LOCK_EXPIRE = 60 * 10 # Lock expires in 10 minutes
|
||||
|
||||
|
||||
class DuplicateTaskException(Exception):
|
||||
"""Exception indicating that a task already exists or has already completed."""
|
||||
@@ -152,6 +156,37 @@ def initialize_subtask_info(entry, action_name, total_num, subtask_id_list):
|
||||
return task_progress
|
||||
|
||||
|
||||
def _acquire_subtask_lock(task_id):
|
||||
"""
|
||||
Mark the specified task_id as being in progress.
|
||||
|
||||
This is used to make sure that the same task is not worked on by more than one worker
|
||||
at the same time. This can occur when tasks are requeued by Celery in response to
|
||||
loss of connection to the task broker. Most of the time, such duplicate tasks are
|
||||
run sequentially, but they can overlap in processing as well.
|
||||
|
||||
Returns true if the task_id was not already locked; false if it was.
|
||||
"""
|
||||
# cache.add fails if the key already exists
|
||||
key = "subtask-{}".format(task_id)
|
||||
succeeded = cache.add(key, 'true', SUBTASK_LOCK_EXPIRE)
|
||||
if not succeeded:
|
||||
TASK_LOG.warning("task_id '%s': already locked. Contains value '%s'", task_id, cache.get(key))
|
||||
return succeeded
|
||||
|
||||
|
||||
def _release_subtask_lock(task_id):
|
||||
"""
|
||||
Unmark the specified task_id as being no longer in progress.
|
||||
|
||||
This is most important to permit a task to be retried.
|
||||
"""
|
||||
# According to Celery task cookbook, "Memcache delete is very slow, but we have
|
||||
# to use it to take advantage of using add() for atomic locking."
|
||||
key = "subtask-{}".format(task_id)
|
||||
cache.delete(key)
|
||||
|
||||
|
||||
def check_subtask_is_valid(entry_id, current_task_id):
|
||||
"""
|
||||
Confirms that the current subtask is known to the InstructorTask and hasn't already been completed.
|
||||
@@ -166,14 +201,16 @@ def check_subtask_is_valid(entry_id, current_task_id):
|
||||
|
||||
If a subtask gets requeued, then the first time the subtask runs it should run fine to completion.
|
||||
However, we want to prevent it from running again, so we check here to see what the existing
|
||||
subtask's status is. If it is complete, we return an exception.
|
||||
subtask's status is. If it is complete, we raise an exception. We also take a lock on the task,
|
||||
so that we can detect if another worker has started work but has not yet completed that work.
|
||||
The other worker is allowed to finish, and this raises an exception.
|
||||
|
||||
Raises a DuplicateTaskException exception if it's not a task that should be run.
|
||||
"""
|
||||
# Confirm that the InstructorTask actually defines subtasks.
|
||||
entry = InstructorTask.objects.get(pk=entry_id)
|
||||
if len(entry.subtasks) == 0:
|
||||
format_str = "Unexpected task_id '{}': unable to find email subtasks of instructor task '{}'"
|
||||
format_str = "Unexpected task_id '{}': unable to find subtasks of instructor task '{}'"
|
||||
msg = format_str.format(current_task_id, entry)
|
||||
TASK_LOG.warning(msg)
|
||||
raise DuplicateTaskException(msg)
|
||||
@@ -182,7 +219,7 @@ def check_subtask_is_valid(entry_id, current_task_id):
|
||||
subtask_dict = json.loads(entry.subtasks)
|
||||
subtask_status_info = subtask_dict['status']
|
||||
if current_task_id not in subtask_status_info:
|
||||
format_str = "Unexpected task_id '{}': unable to find status for email subtask of instructor task '{}'"
|
||||
format_str = "Unexpected task_id '{}': unable to find status for subtask of instructor task '{}'"
|
||||
msg = format_str.format(current_task_id, entry)
|
||||
TASK_LOG.warning(msg)
|
||||
raise DuplicateTaskException(msg)
|
||||
@@ -192,11 +229,20 @@ def check_subtask_is_valid(entry_id, current_task_id):
|
||||
subtask_status = subtask_status_info[current_task_id]
|
||||
subtask_state = subtask_status.get('state')
|
||||
if subtask_state in READY_STATES:
|
||||
format_str = "Unexpected task_id '{}': already completed - status {} for email subtask of instructor task '{}'"
|
||||
format_str = "Unexpected task_id '{}': already completed - status {} for subtask of instructor task '{}'"
|
||||
msg = format_str.format(current_task_id, subtask_status, entry)
|
||||
TASK_LOG.warning(msg)
|
||||
raise DuplicateTaskException(msg)
|
||||
|
||||
# Now we are ready to start working on this. Try to lock it.
|
||||
# If it fails, then it means that another worker is already in the
|
||||
# middle of working on this.
|
||||
if not _acquire_subtask_lock(current_task_id):
|
||||
format_str = "Unexpected task_id '{}': already being executed - for subtask of instructor task '{}'"
|
||||
msg = format_str.format(current_task_id, entry)
|
||||
TASK_LOG.warning(msg)
|
||||
raise DuplicateTaskException(msg)
|
||||
|
||||
|
||||
@transaction.commit_manually
|
||||
def update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
@@ -291,3 +337,5 @@ def update_subtask_status(entry_id, current_task_id, new_subtask_status):
|
||||
else:
|
||||
TASK_LOG.debug("about to commit....")
|
||||
transaction.commit()
|
||||
finally:
|
||||
_release_subtask_lock(current_task_id)
|
||||
|
||||
Reference in New Issue
Block a user