Merge pull request #14090 from edx/efischer/alt_queues
Get alternate queues from ENV_TOKENS
This commit is contained in:
@@ -87,12 +87,6 @@ CELERY_QUEUES = {
|
||||
DEFAULT_PRIORITY_QUEUE: {}
|
||||
}
|
||||
|
||||
# Setup alternate queues, to allow access to cross-process workers
|
||||
ALTERNATE_QUEUE_ENVS = os.environ.get('ALTERNATE_WORKER_QUEUES', '').split()
|
||||
ALTERNATE_QUEUES = [
|
||||
DEFAULT_PRIORITY_QUEUE.replace(QUEUE_VARIANT, alternate + '.')
|
||||
for alternate in ALTERNATE_QUEUE_ENVS
|
||||
]
|
||||
CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT)
|
||||
|
||||
############# NON-SECURE ENV CONFIG ##############################
|
||||
@@ -366,10 +360,17 @@ BROKER_URL = "{0}://{1}:{2}@{3}/{4}".format(CELERY_BROKER_TRANSPORT,
|
||||
CELERY_BROKER_VHOST)
|
||||
BROKER_USE_SSL = ENV_TOKENS.get('CELERY_BROKER_USE_SSL', False)
|
||||
|
||||
# Allow CELERY_QUEUES to be overwritten before adding alternates
|
||||
# Allow CELERY_QUEUES to be overwritten by ENV_TOKENS,
|
||||
ENV_CELERY_QUEUES = ENV_TOKENS.get('CELERY_QUEUES', None)
|
||||
if ENV_CELERY_QUEUES:
|
||||
CELERY_QUEUES = {queue: {} for queue in ENV_CELERY_QUEUES}
|
||||
|
||||
# Then add alternate environment queues
|
||||
ALTERNATE_QUEUE_ENVS = ENV_TOKENS.get('ALTERNATE_WORKER_QUEUES', '').split()
|
||||
ALTERNATE_QUEUES = [
|
||||
DEFAULT_PRIORITY_QUEUE.replace(QUEUE_VARIANT, alternate + '.')
|
||||
for alternate in ALTERNATE_QUEUE_ENVS
|
||||
]
|
||||
CELERY_QUEUES.update(
|
||||
{
|
||||
alternate: {}
|
||||
|
||||
@@ -98,12 +98,6 @@ CELERY_QUEUES = {
|
||||
HIGH_MEM_QUEUE: {},
|
||||
}
|
||||
|
||||
# Setup alternate queues, to allow access to cross-process workers
|
||||
ALTERNATE_QUEUE_ENVS = os.environ.get('ALTERNATE_WORKER_QUEUES', '').split()
|
||||
ALTERNATE_QUEUES = [
|
||||
DEFAULT_PRIORITY_QUEUE.replace(QUEUE_VARIANT, alternate + '.')
|
||||
for alternate in ALTERNATE_QUEUE_ENVS
|
||||
]
|
||||
CELERY_ROUTES = "{}celery.Router".format(QUEUE_VARIANT)
|
||||
|
||||
# If we're a worker on the high_mem queue, set ourselves to die after processing
|
||||
@@ -268,10 +262,17 @@ BULK_EMAIL_ROUTING_KEY_SMALL_JOBS = ENV_TOKENS.get('BULK_EMAIL_ROUTING_KEY_SMALL
|
||||
# Queue to use for updating persistent grades
|
||||
RECALCULATE_GRADES_ROUTING_KEY = ENV_TOKENS.get('RECALCULATE_GRADES_ROUTING_KEY', LOW_PRIORITY_QUEUE)
|
||||
|
||||
# Allow CELERY_QUEUES to be overwritten before adding alternates
|
||||
# Allow CELERY_QUEUES to be overwritten by ENV_TOKENS,
|
||||
ENV_CELERY_QUEUES = ENV_TOKENS.get('CELERY_QUEUES', None)
|
||||
if ENV_CELERY_QUEUES:
|
||||
CELERY_QUEUES = {queue: {} for queue in ENV_CELERY_QUEUES}
|
||||
|
||||
# Then add alternate environment queues
|
||||
ALTERNATE_QUEUE_ENVS = ENV_TOKENS.get('ALTERNATE_WORKER_QUEUES', '').split()
|
||||
ALTERNATE_QUEUES = [
|
||||
DEFAULT_PRIORITY_QUEUE.replace(QUEUE_VARIANT, alternate + '.')
|
||||
for alternate in ALTERNATE_QUEUE_ENVS
|
||||
]
|
||||
CELERY_QUEUES.update(
|
||||
{
|
||||
alternate: {}
|
||||
|
||||
Reference in New Issue
Block a user