feat: management command for consuming kafka events (#29838)

This commit is contained in:
Rebecca Graber
2022-02-03 12:39:31 -05:00
committed by GitHub
parent 7c7792f92a
commit 47aed8d2ab
14 changed files with 287 additions and 3 deletions

View File

@@ -19,7 +19,7 @@ jobs:
- module-name: lms-2
path: "lms/djangoapps/gating/ lms/djangoapps/grades/ lms/djangoapps/instructor/ lms/djangoapps/instructor_analytics/ lms/djangoapps/discussion/ lms/djangoapps/edxnotes/ lms/djangoapps/email_marketing/ lms/djangoapps/experiments/ lms/djangoapps/instructor_task/ lms/djangoapps/learner_dashboard/ lms/djangoapps/lms_initialization/ lms/djangoapps/lms_xblock/ lms/djangoapps/lti_provider/ lms/djangoapps/mailing/ lms/djangoapps/mobile_api/ lms/djangoapps/monitoring/ lms/djangoapps/ora_staff_grader/ lms/djangoapps/program_enrollments/ lms/djangoapps/rss_proxy lms/djangoapps/static_template_view/ lms/djangoapps/staticbook/ lms/djangoapps/support/ lms/djangoapps/survey/ lms/djangoapps/teams/ lms/djangoapps/tests/ lms/djangoapps/user_tours/ lms/djangoapps/verify_student/ lms/envs/ lms/lib/ lms/tests.py"
- module-name: openedx-1
path: "openedx/core/types/ openedx/core/djangoapps/ace_common/ openedx/core/djangoapps/agreements/ openedx/core/djangoapps/api_admin/ openedx/core/djangoapps/auth_exchange/ openedx/core/djangoapps/bookmarks/ openedx/core/djangoapps/cache_toolbox/ openedx/core/djangoapps/catalog/ openedx/core/djangoapps/ccxcon/ openedx/core/djangoapps/commerce/ openedx/core/djangoapps/common_initialization/ openedx/core/djangoapps/common_views/ openedx/core/djangoapps/config_model_utils/ openedx/core/djangoapps/content/ openedx/core/djangoapps/content_libraries/ openedx/core/djangoapps/contentserver/ openedx/core/djangoapps/cookie_metadata/ openedx/core/djangoapps/cors_csrf/ openedx/core/djangoapps/course_apps/ openedx/core/djangoapps/course_date_signals/ openedx/core/djangoapps/course_groups/ openedx/core/djangoapps/coursegraph/ openedx/core/djangoapps/courseware_api/ openedx/core/djangoapps/crawlers/ openedx/core/djangoapps/credentials/ openedx/core/djangoapps/credit/ openedx/core/djangoapps/dark_lang/ openedx/core/djangoapps/debug/ openedx/core/djangoapps/demographics/ openedx/core/djangoapps/discussions/ openedx/core/djangoapps/django_comment_common/ openedx/core/djangoapps/embargo/ openedx/core/djangoapps/enrollments/ openedx/core/djangoapps/external_user_ids/ openedx/core/djangoapps/zendesk_proxy/ openedx/core/djangolib/ openedx/core/lib/ openedx/core/tests/"
path: "openedx/core/types/ openedx/core/djangoapps/ace_common/ openedx/core/djangoapps/agreements/ openedx/core/djangoapps/api_admin/ openedx/core/djangoapps/auth_exchange/ openedx/core/djangoapps/bookmarks/ openedx/core/djangoapps/cache_toolbox/ openedx/core/djangoapps/catalog/ openedx/core/djangoapps/ccxcon/ openedx/core/djangoapps/commerce/ openedx/core/djangoapps/common_initialization/ openedx/core/djangoapps/common_views/ openedx/core/djangoapps/config_model_utils/ openedx/core/djangoapps/content/ openedx/core/djangoapps/content_libraries/ openedx/core/djangoapps/contentserver/ openedx/core/djangoapps/cookie_metadata/ openedx/core/djangoapps/cors_csrf/ openedx/core/djangoapps/course_apps/ openedx/core/djangoapps/course_date_signals/ openedx/core/djangoapps/course_groups/ openedx/core/djangoapps/coursegraph/ openedx/core/djangoapps/courseware_api/ openedx/core/djangoapps/crawlers/ openedx/core/djangoapps/credentials/ openedx/core/djangoapps/credit/ openedx/core/djangoapps/dark_lang/ openedx/core/djangoapps/debug/ openedx/core/djangoapps/demographics/ openedx/core/djangoapps/discussions/ openedx/core/djangoapps/django_comment_common/ openedx/core/djangoapps/embargo/ openedx/core/djangoapps/enrollments/ openedx/core/djangoapps/external_user_ids/ openedx/core/djangoapps/zendesk_proxy/ openedx/core/djangolib/ openedx/core/lib/ openedx/core/tests/ openedx/core/djangoapps/kafka_consumer/"
- module-name: openedx-2
path: "openedx/core/djangoapps/geoinfo/ openedx/core/djangoapps/header_control/ openedx/core/djangoapps/heartbeat/ openedx/core/djangoapps/lang_pref/ openedx/core/djangoapps/models/ openedx/core/djangoapps/monkey_patch/ openedx/core/djangoapps/oauth_dispatch/ openedx/core/djangoapps/olx_rest_api/ openedx/core/djangoapps/password_policy/ openedx/core/djangoapps/plugin_api/ openedx/core/djangoapps/plugins/ openedx/core/djangoapps/profile_images/ openedx/core/djangoapps/programs/ openedx/core/djangoapps/safe_sessions/ openedx/core/djangoapps/schedules/ openedx/core/djangoapps/self_paced/ openedx/core/djangoapps/service_status/ openedx/core/djangoapps/session_inactivity_timeout/ openedx/core/djangoapps/signals/ openedx/core/djangoapps/site_configuration/ openedx/core/djangoapps/system_wide_roles/ openedx/core/djangoapps/theming/ openedx/core/djangoapps/user_api/ openedx/core/djangoapps/user_authn/ openedx/core/djangoapps/util/ openedx/core/djangoapps/verified_track_content/ openedx/core/djangoapps/video_config/ openedx/core/djangoapps/video_pipeline/ openedx/core/djangoapps/waffle_utils/ openedx/core/djangoapps/xblock/ openedx/core/djangoapps/xmodule_django/ openedx/core/tests/ openedx/features/ openedx/testing/ openedx/tests/"
- module-name: common

View File

@@ -3197,7 +3197,10 @@ INSTALLED_APPS = [
'edx_ace',
# For save for later
'lms.djangoapps.save_for_later'
'lms.djangoapps.save_for_later',
# TODO (EventBus): Make Kafka/event-bus optional
'openedx.core.djangoapps.kafka_consumer',
]
######################### CSRF #########################################

View File

@@ -0,0 +1,17 @@
===============
Kafka Consumer
===============
Purpose
=======
This is a (likely temporary) app created to test and iterate on event bus consumer patterns. The goal is to eventually
have a flexible event bus that can be easily brought into other apps and repositories to produce and consume arbitrary
topics. Ideally, the event bus itself will also be an abstraction behind which platform maintainers can use non-Kafka
implementations (Redis, Pulsar, etc.). The documentation/ADRs may also be moved to more appropriate places as the
process matures.
There are a hefty number of "# TODO (EventBus)" annotations left in to help guide further development. This app is intended to be subject to frequent and rapid changes. Outside of testing this app, it is best to leave the
KAFKA_CONSUMERS_ENABLED setting off.

View File

@@ -0,0 +1,7 @@
"""App for consuming Kafka events. Comprises a management command for listening to a topic and supporting methods.
Likely temporary."""
from django.apps import AppConfig
class KafkaConsumerApp(AppConfig):
name = 'openedx.core.djangoapps.kafka_consumer'

View File

@@ -0,0 +1,99 @@
"""Event handling for license-manager events. Likely temporary until an abstraction layer is put in place."""
import logging
from confluent_kafka.schema_registry.avro import AvroDeserializer
from django.conf import settings
logger = logging.getLogger(__name__)
# TODO (EventBus):
# Use TrackingEvent class from openedx_events and use Attr <-> Avro bridge to deserialize
class TrackingEvent:
"""
Temporary copy of TrackingEvent from license-manager to represent license manager events.
Eventually to be moved to openedx_events.
"""
def __init__(self, *args, **kwargs):
self.license_uuid = kwargs.get('license_uuid', None)
self.license_activation_key = kwargs.get('license_activation_key', None)
self.previous_license_uuid = kwargs.get('previous_license_uuid', None)
self.assigned_date = kwargs.get('assigned_date', None)
self.activation_date = kwargs.get('activation_date', None)
self.assigned_lms_user_id = kwargs.get('assigned_lms_user_id', None)
self.assigned_email = kwargs.get('assigned_email', None)
self.expiration_processed = kwargs.get('expiration_processed', None)
self.auto_applied = kwargs.get('auto_applied', None)
self.enterprise_customer_uuid = kwargs.get('enterprise_customer_uuid', None)
self.enterprise_customer_slug = kwargs.get('enterprise_customer_slug', None)
self.enterprise_customer_name = kwargs.get('enterprise_customer_name', None)
self.customer_agreement_uuid = kwargs.get('customer_agreement_uuid', None)
# Some paths will set assigned_lms_user_id to '' if empty, so need to allow strings in the schema
TRACKING_EVENT_AVRO_SCHEMA = """
{
"namespace": "license_manager.apps.subscriptions",
"name": "TrackingEvent",
"type": "record",
"fields": [
{"name": "license_uuid", "type": "string"},
{"name": "license_activation_key", "type": "string"},
{"name": "previous_license_uuid", "type": "string"},
{"name": "assigned_date", "type": "string"},
{"name": "assigned_lms_user_id", "type": ["int", "string", "null"], "default": "null"},
{"name": "assigned_email", "type":"string"},
{"name": "expiration_processed", "type": "boolean"},
{"name": "auto_applied", "type": "boolean", "default": "false"},
{"name": "enterprise_customer_uuid", "type": ["string", "null"], "default": "null"},
{"name": "customer_agreement_uuid", "type": ["string", "null"], "default": "null"},
{"name": "enterprise_customer_slug", "type": ["string", "null"], "default": "null"},
{"name": "enterprise_customer_name", "type": ["string", "null"], "default": "null"}
]
}
"""
@staticmethod
def from_dict(dict_instance, ctx=None): # pylint: disable=unused-argument
return TrackingEvent(**dict_instance)
@staticmethod
def to_dict(obj, ctx=None): # pylint: disable=unused-argument
# remove lms id and email from to_dict for event consumer to not print PII
return {
'enterprise_customer_uuid': obj.enterprise_customer_uuid,
'customer_agreement_uuid': obj.customer_agreement_uuid,
'enterprise_customer_slug': obj.enterprise_customer_slug,
'enterprise_customer_name': obj.enterprise_customer_name,
"license_uuid": obj.license_uuid,
"license_activation_key": obj.license_activation_key,
"previous_license_uuid": obj.previous_license_uuid,
"assigned_date": obj.assigned_date,
"activation_date": obj.activation_date,
"expiration_processed": obj.expiration_processed,
"auto_applied": (obj.auto_applied or False),
}
## TODO (EventBus): Make an extensible base class for message handlers
class LicenseMessageHandler:
""" Simple class to deserialize and print LicenseEvents from Kafka"""
@staticmethod
def getDeserializer(schema_registry_client):
return AvroDeserializer(schema_str=TrackingEvent.TRACKING_EVENT_AVRO_SCHEMA,
schema_registry_client=schema_registry_client,
from_dict=TrackingEvent.from_dict)
@staticmethod
def handleMessage(msg):
logger.info(f"Received message with key {msg.key()} and value {TrackingEvent.to_dict(msg.value())}")
## TODO (EventBus): Find a better place to have the list of topics
def getHandler(topic_name):
TOPIC_HANDLERS = {
settings.LICENSE_EVENT_TOPIC_NAME: LicenseMessageHandler
}
return TOPIC_HANDLERS[topic_name]

View File

@@ -0,0 +1,30 @@
Managing Kafka Consumers
--------------
Status
======
In Progress
Context
=======
As outlined in the upcoming OEP-52, edX.org has elected to go with Apache Kafka as our event bus implementation. Though the decision presented here is predicated on this particular edX.org decision, it is included to help other Open edX users evaluate Kafka for their own purposes. The standard pattern for consuming events with Kafka is to poll in a loop and process messages as they come in. According to the Confluent team it is a best practice to limit each consumer to a single topic (Confluent is a platform for industry-scale Kafka management)::
consumer.subscribe(["topic"])
while True:
message = consumer.poll()
## process message
This ``while True`` loop means whatever is running this consumer will run infinitely and block whatever thread runs it from doing anything else. Thus, this code cannot be run as part of the regular Django web server. It also would not fit neatly onto a celery task, which would put it in direct competition for workers with all other celery tasks and be difficult to scale as the number of topics increases.
Decision
========
edX.org will use Kubernetes to manage containers whose sole purpose is to run a management command, which in turn will run a polling loop against the specified topic. This will enable standard horizontal scaling of Kafka consumer groups.
Rejected Alternatives
=====================
#. Create a new ASG of EC2 instances dedicated to running a consumer management command, similar to how we create instances dedicated to running celery workers
* edX and the industry in general we are moving away from the ASG pattern and on to Kubernetes. Both the ASG approach and the Kubernetes approach would require a substantial amount of work in order to make the number of instances scalable based on number of topics rather than built-in measurements like CPU load. Based on this, it makes more sense to put in the effort in Kubernetes rather than creating more outdated infrastructure.
#. Django-channels
* Research turned up the possibility of using django-channels (websocket equivalent for Django) for use with Kafka, but the design and potential benefit was unclear so this was not pursued further

View File

@@ -0,0 +1,118 @@
"""
Management command for listening to license-manager events and logging them
"""
import logging
from confluent_kafka import DeserializingConsumer, KafkaError
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.serialization import StringDeserializer
from django.conf import settings
from django.core.management.base import BaseCommand
from edx_toggles.toggles import SettingToggle
from openedx.core.djangoapps.kafka_consumer.consumers import getHandler
logger = logging.getLogger(__name__)
# .. toggle_name: KAFKA_CONSUMERS_ENABLED
# .. toggle_implementation: SettingToggle
# .. toggle_default: False
# .. toggle_description: Enables the ability to listen and process events from the Kafka event bus
# .. toggle_use_cases: opt_in
# .. toggle_creation_date: 2022-01-31
# .. toggle_tickets: https://openedx.atlassian.net/browse/ARCHBOM-1992
KAFKA_CONSUMERS_ENABLED = SettingToggle('KAFKA_CONSUMERS_ENABLED', default=False)
CONSUMER_POLL_TIMEOUT = getattr(settings, 'CONSUMER_POLL_TIMEOUT', 1.0)
class Command(BaseCommand):
"""
Listen for events from the event bus and log them. Only run on servers where KAFKA_CONSUMERS_ENABLED is true
"""
help = """
This starts a Kafka event consumer that listens to the specified topic and logs all messages it receives. Topic
is required.
example:
manage.py ... consume_events -t license-event-prod -g license-event-consumers
# TODO (EventBus): Add pointer to relevant future docs around topics and consumer groups, and potentially
update example topic and group names to follow any future naming conventions.
"""
def add_arguments(self, parser):
parser.add_argument(
'-t', '--topic',
nargs=1,
required=True,
help='Topic to consume'
)
parser.add_argument(
'-g', '--group_id',
nargs=1,
required=True,
help='Consumer group id'
)
def handle(self, *args, **options):
if not KAFKA_CONSUMERS_ENABLED.is_enabled():
logger.error("Kafka consumers not enabled")
return
try:
KAFKA_SCHEMA_REGISTRY_CONFIG = {
'url': settings.SCHEMA_REGISTRY_URL,
'basic.auth.user.info': f"{settings.SCHEMA_REGISTRY_API_KEY}:{settings.SCHEMA_REGISTRY_API_SECRET}",
}
schema_registry_client = SchemaRegistryClient(KAFKA_SCHEMA_REGISTRY_CONFIG)
topic = options['topic'][0]
HandlerClass = getHandler(topic)
# TODO (EventBus):
# 1. generalize configurations to allow connection to local Kafka clusters without SSL
# 2. Reevaluate if all consumers should listen for the earliest unprocessed offset (auto.offset.reset)
consumer = DeserializingConsumer({
'bootstrap.servers': settings.KAFKA_BOOTSTRAP_SERVER,
'group.id': options["group_id"][0],
'key.deserializer': StringDeserializer('utf-8'),
'value.deserializer': HandlerClass.getDeserializer(schema_registry_client),
'auto.offset.reset': 'earliest',
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': settings.KAFKA_API_KEY,
'sasl.password': settings.KAFKA_API_SECRET,
})
try:
consumer.subscribe([topic])
# TODO (EventBus):
# 1. Is there an elegant way to exit the loop?
# 2. Determine if there are other errors that shouldn't kill the entire loop
while True:
msg = consumer.poll(timeout=CONSUMER_POLL_TIMEOUT)
if msg is None:
continue
if msg.error():
# TODO (EventBus): iterate on error handling with retry and dead-letter queue topics
if msg.error().code() == KafkaError._PARTITION_EOF: # pylint: disable=protected-access
# End of partition event
logger.info(f"{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset}")
elif msg.error():
logger.exception(msg.error())
continue
HandlerClass.handleMessage(msg)
finally:
# Close down consumer to commit final offsets.
consumer.close()
logger.info("Committing final offsets")
except Exception: # pylint: disable=broad-except
logger.exception("Error consuming Kafka events")

View File

@@ -36,6 +36,7 @@ botocore==1.8.17 # via boto3, s3transfer
bridgekeeper # Used for determining permissions for courseware.
celery # Asynchronous task execution library
chem # A helper library for chemistry calculations
confluent_kafka
contextlib2 # We need contextlib2.ExitStack so we can stop using contextlib.nested which doesn't exist in python 3
crowdsourcehinter-xblock
cryptography # Implementations of assorted cryptography algorithms
@@ -96,6 +97,7 @@ edx-user-state-client
edx-when
edxval
event-tracking
fastavro
fs==2.0.18
fs-s3fs==0.1.8
geoip2 # Python API for the GeoIP web services and databases

View File

@@ -131,6 +131,8 @@ code-annotations==1.2.0
# via
# edx-enterprise
# edx-toggles
confluent-kafka==1.8.2
# via -r requirements/edx/base.in
contextlib2==21.6.0
# via -r requirements/edx/base.in
coreapi==2.3.3
@@ -523,7 +525,9 @@ event-tracking==1.1.4
# edx-proctoring
# edx-search
fastavro==1.4.9
# via openedx-events
# via
# -r requirements/edx/base.in
# openedx-events
frozenlist==1.3.0
# via
# aiohttp

View File

@@ -185,6 +185,8 @@ code-annotations==1.2.0
# edx-enterprise
# edx-lint
# edx-toggles
confluent-kafka==1.8.2
# via -r requirements/edx/testing.txt
contextlib2==21.6.0
# via -r requirements/edx/testing.txt
coreapi==2.3.3

View File

@@ -176,6 +176,8 @@ code-annotations==1.2.0
# edx-enterprise
# edx-lint
# edx-toggles
confluent-kafka==1.8.2
# via -r requirements/edx/base.txt
contextlib2==21.6.0
# via -r requirements/edx/base.txt
coreapi==2.3.3