Files
edx-platform/openedx/core/djangoapps/coursegraph/tasks.py
Kyle McCormick cd3957b987 fix: upgrade py2neo from 3.1.2 to 2021.1.5 (#28480)
* fix: upgrade py2neo from 3.1.2 to 2021.1.5

The dump_to_neo4j management command has not been working
since the upgrade to python 3.8. The latest version of
python that py2neo 3.1.2 states support for is python 3.5,
so this isn't surprising.

The earliest non-prerelease version of py2neo that supports
python 3.8 is 2020.x (skipping the 4.x and 5.x series). Since
we're going as far as a 2020.x, we may as well upgrade all the
way to the newest series, 2021.x. This commit does that upgrade,
as well as a handful of minor code modifications in order
to handle breaking changes that have been made to the py2neo
API, and some unrelated pin bumps as the result of
'make upgrade'.

This will also require an upgrade of Coursegraph's Neo4j
version from 3.2.x to 3.5.x.

TNL-8386
2021-08-25 09:34:41 -04:00

399 lines
13 KiB
Python

"""
This file contains a management command for exporting the modulestore to
neo4j, a graph database.
"""
import logging
from celery import shared_task
from django.utils import timezone
from edx_django_utils.cache import RequestCache
from edx_django_utils.monitoring import set_code_owner_attribute
from opaque_keys.edx.keys import CourseKey
from py2neo import Graph, Node, Relationship, NodeMatcher
log = logging.getLogger(__name__)
celery_log = logging.getLogger('edx.celery.task')
# When testing locally, neo4j's bolt logger was noisy, so we'll only have it
# emit logs if there's an error.
bolt_log = logging.getLogger('neo4j.bolt') # pylint: disable=invalid-name
bolt_log.setLevel(logging.ERROR)
PRIMITIVE_NEO4J_TYPES = (int, bytes, str, float, bool)
def serialize_item(item):
"""
Args:
item: an XBlock
Returns:
fields: a dictionary of an XBlock's field names and values
block_type: the name of the XBlock's type (i.e. 'course'
or 'problem')
"""
from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES
# convert all fields to a dict and filter out parent and children field
fields = {
field: field_value.read_from(item)
for (field, field_value) in item.fields.items()
if field not in ['parent', 'children']
}
course_key = item.scope_ids.usage_id.course_key
block_type = item.scope_ids.block_type
# set or reset some defaults
fields['edited_on'] = str(getattr(item, 'edited_on', ''))
fields['display_name'] = item.display_name_with_default
fields['org'] = course_key.org
fields['course'] = course_key.course
fields['run'] = course_key.run
fields['course_key'] = str(course_key)
fields['location'] = str(item.location)
fields['block_type'] = block_type
fields['detached'] = block_type in DETACHED_XBLOCK_TYPES
if block_type == 'course':
# prune the checklists field
if 'checklists' in fields:
del fields['checklists']
# record the time this command was run
fields['time_last_dumped_to_neo4j'] = str(timezone.now())
return fields, block_type
def coerce_types(value):
"""
Args:
value: the value of an xblock's field
Returns: either the value, a text version of the value, or, if the
value is a list, a list where each element is converted to text.
"""
coerced_value = value
if isinstance(value, list):
coerced_value = [str(element) for element in coerced_value]
# if it's not one of the types that neo4j accepts,
# just convert it to text
elif not isinstance(value, PRIMITIVE_NEO4J_TYPES):
coerced_value = str(value)
return coerced_value
def add_to_transaction(neo4j_entities, transaction):
"""
Args:
neo4j_entities: a list of Nodes or Relationships
transaction: a neo4j transaction
"""
for entity in neo4j_entities:
transaction.create(entity)
def get_command_last_run(course_key, graph):
"""
This information is stored on the course node of a course in neo4j
Args:
course_key: a CourseKey
graph: a py2neo Graph
Returns: The datetime that the command was last run, converted into
text, or None, if there's no record of this command last being run.
"""
matcher = NodeMatcher(graph)
course_node = matcher.match(
"course",
course_key=str(course_key)
).first()
last_this_command_was_run = None
if course_node:
last_this_command_was_run = course_node['time_last_dumped_to_neo4j']
return last_this_command_was_run
def get_course_last_published(course_key):
"""
We use the CourseStructure table to get when this course was last
published.
Args:
course_key: a CourseKey
Returns: The datetime the course was last published at, converted into
text, or None, if there's no record of the last time this course
was published.
"""
# Import is placed here to avoid model import at project startup.
from xmodule.modulestore.django import modulestore
from openedx.core.djangoapps.content.block_structure.models import BlockStructureModel
from openedx.core.djangoapps.content.block_structure.exceptions import BlockStructureNotFound
store = modulestore()
course_usage_key = store.make_course_usage_key(course_key)
try:
structure = BlockStructureModel.get(course_usage_key)
course_last_published_date = str(structure.modified)
except BlockStructureNotFound:
course_last_published_date = None
return course_last_published_date
def strip_branch_and_version(location):
"""
Removes the branch and version information from a location.
Args:
location: an xblock's location.
Returns: that xblock's location without branch and version information.
"""
return location.for_branch(None)
def serialize_course(course_id):
"""
Serializes a course into py2neo Nodes and Relationships
Args:
course_id: CourseKey of the course we want to serialize
Returns:
nodes: a list of py2neo Node objects
relationships: a list of py2neo Relationships objects
"""
# Import is placed here to avoid model import at project startup.
from xmodule.modulestore.django import modulestore
# create a location to node mapping we'll need later for
# writing relationships
location_to_node = {}
items = modulestore().get_items(course_id)
# create nodes
for item in items:
fields, block_type = serialize_item(item)
for field_name, value in fields.items():
fields[field_name] = coerce_types(value)
node = Node(block_type, 'item', **fields)
location_to_node[strip_branch_and_version(item.location)] = node
# create relationships
relationships = []
for item in items:
previous_child_node = None
for index, child in enumerate(item.get_children()):
parent_node = location_to_node.get(strip_branch_and_version(item.location))
child_node = location_to_node.get(strip_branch_and_version(child.location))
if parent_node is not None and child_node is not None:
child_node["index"] = index
relationship = Relationship(parent_node, "PARENT_OF", child_node)
relationships.append(relationship)
if previous_child_node:
ordering_relationship = Relationship(
previous_child_node,
"PRECEDES",
child_node,
)
relationships.append(ordering_relationship)
previous_child_node = child_node
nodes = list(location_to_node.values())
return nodes, relationships
def should_dump_course(course_key, graph):
"""
Only dump the course if it's been changed since the last time it's been
dumped.
Args:
course_key: a CourseKey object.
graph: a py2neo Graph object.
Returns: bool of whether this course should be dumped to neo4j.
"""
last_this_command_was_run = get_command_last_run(course_key, graph)
course_last_published_date = get_course_last_published(course_key)
# if we don't have a record of the last time this command was run,
# we should serialize the course and dump it
if last_this_command_was_run is None:
return True
# if we've serialized the course recently and we have no published
# events, we will not dump it, and so we can skip serializing it
# again here
if last_this_command_was_run and course_last_published_date is None:
return False
# otherwise, serialize and dump the course if the command was run
# before the course's last published event
return last_this_command_was_run < course_last_published_date
@shared_task
@set_code_owner_attribute
def dump_course_to_neo4j(course_key_string, credentials):
"""
Serializes a course and writes it to neo4j.
Arguments:
course_key: course key for the course to be exported
credentials (dict): the necessary credentials to connect
to neo4j and create a py2neo `Graph` obje
"""
course_key = CourseKey.from_string(course_key_string)
nodes, relationships = serialize_course(course_key)
celery_log.info(
"Now dumping %s to neo4j: %d nodes and %d relationships",
course_key,
len(nodes),
len(relationships),
)
graph = authenticate_and_create_graph(credentials)
transaction = graph.begin()
course_string = str(course_key)
try:
# first, delete existing course
transaction.run(
"MATCH (n:item) WHERE n.course_key='{}' DETACH DELETE n".format(
course_string
)
)
# now, re-add it
add_to_transaction(nodes, transaction)
add_to_transaction(relationships, transaction)
graph.commit(transaction)
celery_log.info("Completed dumping %s to neo4j", course_key)
except Exception: # pylint: disable=broad-except
celery_log.exception(
"Error trying to dump course %s to neo4j, rolling back",
course_string
)
graph.rollback(transaction)
class ModuleStoreSerializer:
"""
Class with functionality to serialize a modulestore into subgraphs,
one graph per course.
"""
def __init__(self, course_keys):
self.course_keys = course_keys
@classmethod
def create(cls, courses=None, skip=None):
"""
Sets the object's course_keys attribute from the `courses` parameter.
If that parameter isn't furnished, loads all course_keys from the
modulestore.
Filters out course_keys in the `skip` parameter, if provided.
Args:
courses: A list of string serializations of course keys.
For example, ["course-v1:org+course+run"].
skip: Also a list of string serializations of course keys.
"""
# Import is placed here to avoid model import at project startup.
from xmodule.modulestore.django import modulestore
if courses:
course_keys = [CourseKey.from_string(course.strip()) for course in courses]
else:
course_keys = [
course.id for course in modulestore().get_course_summaries()
]
if skip is not None:
skip_keys = [CourseKey.from_string(course.strip()) for course in skip]
course_keys = [course_key for course_key in course_keys if course_key not in skip_keys]
return cls(course_keys)
def dump_courses_to_neo4j(self, credentials, override_cache=False):
"""
Method that iterates through a list of courses in a modulestore,
serializes them, then submits tasks to write them to neo4j.
Arguments:
credentials (dict): the necessary credentials to connect
to neo4j and create a py2neo `Graph` object
override_cache: serialize the courses even if they'be been recently
serialized
Returns: two lists--one of the courses that were successfully written
to neo4j and one of courses that were not.
"""
total_number_of_courses = len(self.course_keys)
submitted_courses = []
skipped_courses = []
graph = authenticate_and_create_graph(credentials)
for index, course_key in enumerate(self.course_keys):
# first, clear the request cache to prevent memory leaks
RequestCache.clear_all_namespaces()
log.info(
"Now submitting %s for export to neo4j: course %d of %d total courses",
course_key,
index + 1,
total_number_of_courses,
)
if not (override_cache or should_dump_course(course_key, graph)):
log.info("skipping submitting %s, since it hasn't changed", course_key)
skipped_courses.append(str(course_key))
continue
dump_course_to_neo4j.apply_async(
args=[str(course_key), credentials],
)
submitted_courses.append(str(course_key))
return submitted_courses, skipped_courses
def authenticate_and_create_graph(credentials):
"""
This function authenticates with neo4j and creates a py2neo graph object
Arguments:
credentials (dict): a dictionary of credentials used to authenticate,
and then create, a py2neo graph object.
Returns: a py2neo `Graph` object.
"""
host = credentials['host']
port = credentials['port']
secure = credentials['secure']
neo4j_user = credentials['user']
neo4j_password = credentials['password']
graph = Graph(
protocol='bolt',
password=neo4j_password,
user=neo4j_user,
address=host,
port=port,
secure=secure,
)
return graph