Merge pull request #13103 from edx/dump-to-neo4j-py2neo
dump item information to neo4j
This commit is contained in:
211
lms/djangoapps/courseware/management/commands/dump_to_neo4j.py
Normal file
211
lms/djangoapps/courseware/management/commands/dump_to_neo4j.py
Normal file
@@ -0,0 +1,211 @@
|
||||
"""
|
||||
This file contains a management command for exporting the modulestore to
|
||||
neo4j, a graph database.
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import logging
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from django.utils import six
|
||||
from py2neo import Graph, Node, Relationship, authenticate
|
||||
from py2neo.compat import integer, string, unicode as neo4j_unicode
|
||||
from request_cache.middleware import RequestCache
|
||||
from xmodule.modulestore.django import modulestore
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
# When testing locally, neo4j's bolt logger was noisy, so we'll only have it
|
||||
# emit logs if there's an error.
|
||||
bolt_log = logging.getLogger('neo4j.bolt') # pylint: disable=invalid-name
|
||||
bolt_log.setLevel(logging.ERROR)
|
||||
|
||||
ITERABLE_NEO4J_TYPES = (tuple, list, set, frozenset)
|
||||
PRIMITIVE_NEO4J_TYPES = (integer, string, neo4j_unicode, float, bool)
|
||||
|
||||
|
||||
class ModuleStoreSerializer(object):
|
||||
"""
|
||||
Class with functionality to serialize a modulestore into subgraphs,
|
||||
one graph per course.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.all_courses = modulestore().get_course_summaries()
|
||||
|
||||
@staticmethod
|
||||
def serialize_item(item):
|
||||
"""
|
||||
Args:
|
||||
item: an XBlock
|
||||
|
||||
Returns:
|
||||
fields: a dictionary of an XBlock's field names and values
|
||||
label: the name of the XBlock's type (i.e. 'course'
|
||||
or 'problem')
|
||||
"""
|
||||
# convert all fields to a dict and filter out parent and children field
|
||||
fields = dict(
|
||||
(field, field_value.read_from(item))
|
||||
for (field, field_value) in six.iteritems(item.fields)
|
||||
if field not in ['parent', 'children']
|
||||
)
|
||||
|
||||
course_key = item.scope_ids.usage_id.course_key
|
||||
|
||||
# set or reset some defaults
|
||||
fields['edited_on'] = six.text_type(getattr(item, 'edited_on', ''))
|
||||
fields['display_name'] = item.display_name_with_default
|
||||
fields['org'] = course_key.org
|
||||
fields['course'] = course_key.course
|
||||
fields['run'] = course_key.run
|
||||
fields['course_key'] = six.text_type(course_key)
|
||||
|
||||
label = item.scope_ids.block_type
|
||||
|
||||
# prune some fields
|
||||
if label == 'course':
|
||||
if 'checklists' in fields:
|
||||
del fields['checklists']
|
||||
|
||||
return fields, label
|
||||
|
||||
def serialize_course(self, course_id):
|
||||
"""
|
||||
Args:
|
||||
course_id: CourseKey of the course we want to serialize
|
||||
|
||||
Returns:
|
||||
nodes: a list of py2neo Node objects
|
||||
relationships: a list of py2neo Relationships objects
|
||||
|
||||
Serializes a course into Nodes and Relationships
|
||||
"""
|
||||
# create a location to node mapping we'll need later for
|
||||
# writing relationships
|
||||
location_to_node = {}
|
||||
items = modulestore().get_items(course_id)
|
||||
|
||||
# create nodes
|
||||
nodes = []
|
||||
for item in items:
|
||||
fields, label = self.serialize_item(item)
|
||||
|
||||
for field_name, value in six.iteritems(fields):
|
||||
fields[field_name] = self.coerce_types(value)
|
||||
|
||||
node = Node(label, 'item', **fields)
|
||||
nodes.append(node)
|
||||
location_to_node[item.location] = node
|
||||
|
||||
# create relationships
|
||||
relationships = []
|
||||
for item in items:
|
||||
for child_loc in item.get_children():
|
||||
parent_node = location_to_node.get(item.location)
|
||||
child_node = location_to_node.get(child_loc.location)
|
||||
if parent_node is not None and child_node is not None:
|
||||
relationship = Relationship(parent_node, "PARENT_OF", child_node)
|
||||
relationships.append(relationship)
|
||||
|
||||
return nodes, relationships
|
||||
|
||||
@staticmethod
|
||||
def coerce_types(value):
|
||||
"""
|
||||
Args:
|
||||
value: the value of an xblock's field
|
||||
|
||||
Returns: either the value, a text version of the value, or, if the
|
||||
value is iterable, the value with each element being converted to text
|
||||
"""
|
||||
|
||||
coerced_value = value
|
||||
if isinstance(value, ITERABLE_NEO4J_TYPES):
|
||||
coerced_value = []
|
||||
for element in value:
|
||||
coerced_value.append(six.text_type(element))
|
||||
# convert coerced_value back to its original type
|
||||
coerced_value = type(value)(coerced_value)
|
||||
|
||||
# if it's not one of the types that neo4j accepts,
|
||||
# just convert it to text
|
||||
elif not isinstance(value, PRIMITIVE_NEO4J_TYPES):
|
||||
coerced_value = six.text_type(value)
|
||||
|
||||
return coerced_value
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
"""
|
||||
Command to dump modulestore data to neo4j
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def add_to_transaction(neo4j_entities, transaction):
|
||||
"""
|
||||
Args:
|
||||
neo4j_entities: a list of Nodes or Relationships
|
||||
transaction: a neo4j transaction
|
||||
"""
|
||||
for entity in neo4j_entities:
|
||||
transaction.create(entity)
|
||||
|
||||
def handle(self, *args, **options): # pylint: disable=unused-argument
|
||||
"""
|
||||
Iterates through each course, serializes them into graphs, and saves
|
||||
those graphs to neo4j.
|
||||
"""
|
||||
# first, make sure that there's a valid neo4j configuration
|
||||
if settings.NEO4J_CONFIG is None:
|
||||
raise CommandError(
|
||||
"No neo4j configuration (NEO4J_CONFIG) defined in lms.auth.json."
|
||||
)
|
||||
|
||||
auth_params = ["{host}:{https_port}", "{user}", "{password}"]
|
||||
authenticate(*[param.format(**settings.NEO4J_CONFIG) for param in auth_params])
|
||||
|
||||
graph = Graph(**settings.NEO4J_CONFIG)
|
||||
|
||||
mss = ModuleStoreSerializer()
|
||||
|
||||
total_number_of_courses = len(mss.all_courses)
|
||||
|
||||
for index, course in enumerate(mss.all_courses):
|
||||
# first, clear the request cache to prevent memory leaks
|
||||
RequestCache.clear_request_cache()
|
||||
|
||||
log.info(
|
||||
"Now exporting %s to neo4j: course %d of %d total courses",
|
||||
course.id,
|
||||
index + 1,
|
||||
total_number_of_courses
|
||||
)
|
||||
nodes, relationships = mss.serialize_course(course.id)
|
||||
log.info(
|
||||
"%d nodes and %d relationships in %s",
|
||||
len(nodes),
|
||||
len(relationships),
|
||||
course.id
|
||||
)
|
||||
|
||||
transaction = graph.begin()
|
||||
try:
|
||||
# first, delete existing course
|
||||
transaction.run(
|
||||
"MATCH (n:item) WHERE n.course_key='{}' DETACH DELETE n".format(
|
||||
six.text_type(course.id)
|
||||
)
|
||||
)
|
||||
|
||||
# now, re-add it
|
||||
self.add_to_transaction(nodes, transaction)
|
||||
self.add_to_transaction(relationships, transaction)
|
||||
transaction.commit()
|
||||
|
||||
except Exception: # pylint: disable=broad-except
|
||||
log.exception(
|
||||
"Error trying to dump course %s to neo4j, rolling back",
|
||||
six.text_type(course.id)
|
||||
)
|
||||
transaction.rollback()
|
||||
@@ -0,0 +1,154 @@
|
||||
# coding=utf-8
|
||||
"""
|
||||
Tests for the dump_to_neo4j management command.
|
||||
"""
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import ddt
|
||||
import mock
|
||||
from courseware.management.commands.dump_to_neo4j import (
|
||||
ModuleStoreSerializer,
|
||||
ITERABLE_NEO4J_TYPES,
|
||||
)
|
||||
from django.core.management import call_command
|
||||
from django.core.management.base import CommandError
|
||||
from django.utils import six
|
||||
from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase
|
||||
from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory
|
||||
|
||||
|
||||
class TestDumpToNeo4jCommandBase(SharedModuleStoreTestCase):
|
||||
"""
|
||||
Base class for the test suites in this file. Sets up a couple courses.
|
||||
"""
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
super(TestDumpToNeo4jCommandBase, cls).setUpClass()
|
||||
cls.course = CourseFactory.create()
|
||||
cls.chapter = ItemFactory.create(parent=cls.course, category='chapter')
|
||||
cls.sequential = ItemFactory.create(parent=cls.chapter, category='sequential')
|
||||
cls.vertical = ItemFactory.create(parent=cls.sequential, category='vertical')
|
||||
cls.html = ItemFactory.create(parent=cls.vertical, category='html')
|
||||
cls.problem = ItemFactory.create(parent=cls.vertical, category='problem')
|
||||
cls.video = ItemFactory.create(parent=cls.vertical, category='video')
|
||||
cls.video2 = ItemFactory.create(parent=cls.vertical, category='video')
|
||||
|
||||
cls.course2 = CourseFactory.create()
|
||||
|
||||
|
||||
class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase):
|
||||
"""
|
||||
Tests for the dump to neo4j management command
|
||||
"""
|
||||
@mock.patch('courseware.management.commands.dump_to_neo4j.Graph')
|
||||
def test_dump_to_neo4j(self, mock_graph_class):
|
||||
"""
|
||||
Tests the dump_to_neo4j management command works against a mock
|
||||
py2neo Graph
|
||||
"""
|
||||
mock_graph = mock_graph_class.return_value
|
||||
mock_transaction = mock.Mock()
|
||||
mock_graph.begin.return_value = mock_transaction
|
||||
|
||||
call_command('dump_to_neo4j')
|
||||
|
||||
self.assertEqual(mock_graph.begin.call_count, 2)
|
||||
self.assertEqual(mock_transaction.commit.call_count, 2)
|
||||
self.assertEqual(mock_transaction.rollback.call_count, 0)
|
||||
|
||||
# 7 nodes + 9 relationships from the first course
|
||||
# 2 nodes and no relationships from the second
|
||||
self.assertEqual(mock_transaction.create.call_count, 18)
|
||||
self.assertEqual(mock_transaction.run.call_count, 2)
|
||||
|
||||
@mock.patch('courseware.management.commands.dump_to_neo4j.Graph')
|
||||
def test_dump_to_neo4j_rollback(self, mock_graph_class):
|
||||
"""
|
||||
Tests that the management command handles the case where there's
|
||||
an exception trying to write to the neo4j database.
|
||||
"""
|
||||
mock_graph = mock_graph_class.return_value
|
||||
mock_transaction = mock.Mock()
|
||||
mock_graph.begin.return_value = mock_transaction
|
||||
mock_transaction.run.side_effect = ValueError('Something went wrong!')
|
||||
|
||||
call_command('dump_to_neo4j')
|
||||
|
||||
self.assertEqual(mock_graph.begin.call_count, 2)
|
||||
self.assertEqual(mock_transaction.commit.call_count, 0)
|
||||
self.assertEqual(mock_transaction.rollback.call_count, 2)
|
||||
|
||||
@mock.patch('django.conf.settings.NEO4J_CONFIG', None)
|
||||
def test_dump_to_neo4j_no_config(self):
|
||||
"""
|
||||
Tests that the command errors out if there isn't a configuration
|
||||
file found
|
||||
"""
|
||||
with self.assertRaises(CommandError):
|
||||
call_command('dump_to_neo4j')
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase):
|
||||
"""
|
||||
Tests for the ModuleStoreSerializer
|
||||
"""
|
||||
def setUp(self):
|
||||
super(TestModuleStoreSerializer, self).setUp()
|
||||
self.modulestore_serializer = ModuleStoreSerializer()
|
||||
|
||||
def test_serialize_item(self):
|
||||
"""
|
||||
Tests the serialize_item method.
|
||||
"""
|
||||
fields, label = self.modulestore_serializer.serialize_item(self.course)
|
||||
self.assertEqual(label, "course")
|
||||
self.assertIn("edited_on", fields.keys())
|
||||
self.assertIn("display_name", fields.keys())
|
||||
self.assertIn("org", fields.keys())
|
||||
self.assertIn("course", fields.keys())
|
||||
self.assertIn("run", fields.keys())
|
||||
self.assertIn("course_key", fields.keys())
|
||||
self.assertNotIn("checklist", fields.keys())
|
||||
|
||||
def test_serialize_course(self):
|
||||
"""
|
||||
Tests the serialize_course method.
|
||||
"""
|
||||
nodes, relationships = self.modulestore_serializer.serialize_course(
|
||||
self.course.id
|
||||
)
|
||||
self.assertEqual(len(nodes), 9)
|
||||
self.assertEqual(len(relationships), 7)
|
||||
|
||||
@ddt.data(*ITERABLE_NEO4J_TYPES)
|
||||
def test_coerce_types_iterable(self, iterable_type):
|
||||
"""
|
||||
Tests the coerce_types helper method for iterable types
|
||||
"""
|
||||
example_iterable = iterable_type([object, object, object])
|
||||
|
||||
# each element in the iterable is not unicode:
|
||||
self.assertFalse(any(isinstance(tab, six.text_type) for tab in example_iterable))
|
||||
# but after they are coerced, they are:
|
||||
coerced = self.modulestore_serializer.coerce_types(example_iterable)
|
||||
self.assertTrue(all(isinstance(tab, six.text_type) for tab in coerced))
|
||||
# finally, make sure we haven't changed the type:
|
||||
self.assertEqual(type(coerced), iterable_type)
|
||||
|
||||
@ddt.data(
|
||||
(1, 1),
|
||||
(object, "<type 'object'>"),
|
||||
(1.5, 1.5),
|
||||
("úñîçø∂é", "úñîçø∂é"),
|
||||
(b"plain string", b"plain string"),
|
||||
(True, True),
|
||||
(None, "None"),
|
||||
)
|
||||
@ddt.unpack
|
||||
def test_coerce_types_base(self, original_value, coerced_expected):
|
||||
"""
|
||||
Tests the coerce_types helper for the neo4j base types
|
||||
"""
|
||||
coerced_value = self.modulestore_serializer.coerce_types(original_value)
|
||||
self.assertEqual(coerced_value, coerced_expected)
|
||||
@@ -863,3 +863,6 @@ API_ACCESS_FROM_EMAIL = ENV_TOKENS.get('API_ACCESS_FROM_EMAIL')
|
||||
APP_UPGRADE_CACHE_TIMEOUT = ENV_TOKENS.get('APP_UPGRADE_CACHE_TIMEOUT', APP_UPGRADE_CACHE_TIMEOUT)
|
||||
|
||||
AFFILIATE_COOKIE_NAME = ENV_TOKENS.get('AFFILIATE_COOKIE_NAME', AFFILIATE_COOKIE_NAME)
|
||||
|
||||
############## Settings for Neo4j ############################
|
||||
NEO4J_CONFIG = AUTH_TOKENS.get('NEO4J_CONFIG')
|
||||
|
||||
@@ -2960,3 +2960,8 @@ AFFILIATE_COOKIE_NAME = 'affiliate_id'
|
||||
# The cache is cleared when Redirect models are saved/deleted
|
||||
REDIRECT_CACHE_TIMEOUT = None # The length of time we cache Redirect model data
|
||||
REDIRECT_CACHE_KEY_PREFIX = 'redirects'
|
||||
|
||||
############## Settings for Neo4j ############################
|
||||
|
||||
# This should be set in configuration
|
||||
NEO4J_CONFIG = None
|
||||
|
||||
@@ -590,3 +590,13 @@ COURSE_CATALOG_API_URL = 'https://catalog.example.com/api/v1'
|
||||
COMPREHENSIVE_THEME_DIRS = [REPO_ROOT / "themes", REPO_ROOT / "common/test"]
|
||||
|
||||
LMS_ROOT_URL = "http://localhost:8000"
|
||||
|
||||
# Test configuration for neo4j
|
||||
NEO4J_CONFIG = {
|
||||
'bolt': True,
|
||||
'password': 'password',
|
||||
'user': 'neo4j',
|
||||
'https_port': 7473,
|
||||
'host': 'localhost',
|
||||
'secure': True,
|
||||
}
|
||||
|
||||
@@ -186,3 +186,7 @@ sailthru-client==2.2.3
|
||||
|
||||
# Release utils for the edx release pipeline
|
||||
edx-django-release-util==0.1.0
|
||||
|
||||
# Used to communicate with Neo4j, which is used internally for
|
||||
# modulestore inspection
|
||||
py2neo==3.1.2
|
||||
|
||||
Reference in New Issue
Block a user