diff --git a/cms/envs/aws.py b/cms/envs/aws.py index c217d2aa81..b41460fa3d 100644 --- a/cms/envs/aws.py +++ b/cms/envs/aws.py @@ -498,3 +498,6 @@ AFFILIATE_COOKIE_NAME = ENV_TOKENS.get('AFFILIATE_COOKIE_NAME', AFFILIATE_COOKIE ############## Settings for Studio Context Sensitive Help ############## HELP_TOKENS_BOOKS = ENV_TOKENS.get('HELP_TOKENS_BOOKS', HELP_TOKENS_BOOKS) + +############## Settings for CourseGraph ############################ +COURSEGRAPH_JOB_QUEUE = LOW_PRIORITY_QUEUE diff --git a/cms/envs/common.py b/cms/envs/common.py index 84d171c8d9..602e8fb1f2 100644 --- a/cms/envs/common.py +++ b/cms/envs/common.py @@ -1315,3 +1315,6 @@ COURSE_CATALOG_API_URL = None # Queue to use for updating persistent grades RECALCULATE_GRADES_ROUTING_KEY = LOW_PRIORITY_QUEUE + +############## Settings for CourseGraph ############################ +COURSEGRAPH_JOB_QUEUE = LOW_PRIORITY_QUEUE diff --git a/lms/envs/aws.py b/lms/envs/aws.py index ad4018200d..d689475dbb 100644 --- a/lms/envs/aws.py +++ b/lms/envs/aws.py @@ -999,3 +999,6 @@ COURSES_API_CACHE_TIMEOUT = ENV_TOKENS.get('COURSES_API_CACHE_TIMEOUT', COURSES_ # Add an ICP license for serving content in China if your organization is registered to do so ICP_LICENSE = ENV_TOKENS.get('ICP_LICENSE', None) + +############## Settings for CourseGraph ############################ +COURSEGRAPH_JOB_QUEUE = LOW_PRIORITY_QUEUE diff --git a/lms/envs/common.py b/lms/envs/common.py index ad435f1421..dc8fe073f0 100644 --- a/lms/envs/common.py +++ b/lms/envs/common.py @@ -3190,3 +3190,6 @@ COURSE_ENROLLMENT_MODES = { ############## Settings for the Discovery App ###################### COURSES_API_CACHE_TIMEOUT = 3600 # Value is in seconds + +############## Settings for CourseGraph ############################ +COURSEGRAPH_JOB_QUEUE = LOW_PRIORITY_QUEUE diff --git a/openedx/core/djangoapps/coursegraph/apps.py b/openedx/core/djangoapps/coursegraph/apps.py index 11524aa240..3af16c005b 100644 --- a/openedx/core/djangoapps/coursegraph/apps.py +++ b/openedx/core/djangoapps/coursegraph/apps.py @@ -12,3 +12,5 @@ class CoursegraphConfig(AppConfig): AppConfig for courseware app """ name = 'openedx.core.djangoapps.coursegraph' + + from . import tasks # pylint: disable=unused-variable diff --git a/openedx/core/djangoapps/coursegraph/management/commands/dump_to_neo4j.py b/openedx/core/djangoapps/coursegraph/management/commands/dump_to_neo4j.py index d275cc7bec..1af006432e 100644 --- a/openedx/core/djangoapps/coursegraph/management/commands/dump_to_neo4j.py +++ b/openedx/core/djangoapps/coursegraph/management/commands/dump_to_neo4j.py @@ -7,334 +7,12 @@ from __future__ import unicode_literals, print_function import logging from django.core.management.base import BaseCommand -from django.utils import six, timezone -from opaque_keys.edx.keys import CourseKey -from py2neo import Graph, Node, Relationship, authenticate, NodeSelector -from py2neo.compat import integer, string, unicode as neo4j_unicode -from request_cache.middleware import RequestCache -from xmodule.modulestore.django import modulestore -from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES +from django.utils import six -from openedx.core.djangoapps.content.course_structures.models import CourseStructure +from openedx.core.djangoapps.coursegraph.tasks import ModuleStoreSerializer log = logging.getLogger(__name__) -# When testing locally, neo4j's bolt logger was noisy, so we'll only have it -# emit logs if there's an error. -bolt_log = logging.getLogger('neo4j.bolt') # pylint: disable=invalid-name -bolt_log.setLevel(logging.ERROR) - -PRIMITIVE_NEO4J_TYPES = (integer, string, neo4j_unicode, float, bool) - - -class ModuleStoreSerializer(object): - """ - Class with functionality to serialize a modulestore into subgraphs, - one graph per course. - """ - - def __init__(self, course_keys): - self.course_keys = course_keys - - @classmethod - def create(cls, courses=None, skip=None): - """ - Sets the object's course_keys attribute from the `courses` parameter. - If that parameter isn't furnished, loads all course_keys from the - modulestore. - - Filters out course_keys in the `skip` parameter, if provided. - - Arguments: - courses: A list of string serializations of course keys. - For example, ["course-v1:org+course+run"]. - skip: Also a list of string serializations of course keys. - - Returns: - a ModulestoreSerializer instance - """ - if courses: - course_keys = [CourseKey.from_string(course.strip()) for course in courses] - else: - course_keys = [ - course.id for course in modulestore().get_course_summaries() - ] - if skip is not None: - skip_keys = set([CourseKey.from_string(course.strip()) for course in skip]) - course_keys = [course_key for course_key in course_keys if course_key not in skip_keys] - return cls(course_keys) - - @staticmethod - def serialize_item(item): - """ - Arguments: - item: an XBlock - - Returns: - fields: a dictionary of an XBlock's field names and values - block_type: the name of the XBlock's type (i.e. 'course' - or 'problem') - """ - # convert all fields to a dict and filter out parent and children field - fields = dict( - (field, field_value.read_from(item)) - for (field, field_value) in six.iteritems(item.fields) - if field not in ['parent', 'children'] - ) - - course_key = item.scope_ids.usage_id.course_key - block_type = item.scope_ids.block_type - - # set or reset some defaults - fields['edited_on'] = six.text_type(getattr(item, 'edited_on', '')) - fields['display_name'] = item.display_name_with_default - fields['org'] = course_key.org - fields['course'] = course_key.course - fields['run'] = course_key.run - fields['course_key'] = six.text_type(course_key) - fields['location'] = six.text_type(item.location) - fields['block_type'] = block_type - fields['detached'] = block_type in DETACHED_XBLOCK_TYPES - - if block_type == 'course': - # prune the checklists field - if 'checklists' in fields: - del fields['checklists'] - - # record the time this command was run - fields['time_last_dumped_to_neo4j'] = six.text_type(timezone.now()) - - return fields, block_type - - def serialize_course(self, course_id): - """ - Serializes a course into py2neo Nodes and Relationships - Arguments: - course_id: CourseKey of the course we want to serialize - - Returns: - nodes: a list of py2neo Node objects - relationships: a list of py2neo Relationships objects - """ - # create a location to node mapping we'll need later for - # writing relationships - location_to_node = {} - items = modulestore().get_items(course_id) - - # create nodes - for item in items: - fields, block_type = self.serialize_item(item) - - for field_name, value in six.iteritems(fields): - fields[field_name] = self.coerce_types(value) - - node = Node(block_type, 'item', **fields) - location_to_node[item.location] = node - - # create relationships - relationships = [] - for item in items: - previous_child_node = None - for index, child_loc in enumerate(item.get_children()): - parent_node = location_to_node.get(item.location) - child_node = location_to_node.get(child_loc.location) - child_node["index"] = index - if parent_node is not None and child_node is not None: - relationship = Relationship(parent_node, "PARENT_OF", child_node) - relationships.append(relationship) - - if previous_child_node: - ordering_relationship = Relationship( - previous_child_node, "PRECEDES", child_node - ) - relationships.append(ordering_relationship) - previous_child_node = child_node - - nodes = location_to_node.values() - return nodes, relationships - - @staticmethod - def coerce_types(value): - """ - Arguments: - value: the value of an xblock's field - - Returns: either the value, a text version of the value, or, if the - value is a list, a list where each element is converted to text. - """ - coerced_value = value - if isinstance(value, list): - coerced_value = [six.text_type(element) for element in coerced_value] - - # if it's not one of the types that neo4j accepts, - # just convert it to text - elif not isinstance(value, PRIMITIVE_NEO4J_TYPES): - coerced_value = six.text_type(value) - - return coerced_value - - @staticmethod - def add_to_transaction(neo4j_entities, transaction): - """ - Arguments: - neo4j_entities: a list of Nodes or Relationships - transaction: a neo4j transaction - """ - for entity in neo4j_entities: - transaction.create(entity) - - @staticmethod - def get_command_last_run(course_key, graph): - """ - This information is stored on the course node of a course in neo4j - Arguments: - course_key: a CourseKey - graph: a py2neo Graph - - Returns: The datetime that the command was last run, converted into - text, or None, if there's no record of this command last being run. - - """ - selector = NodeSelector(graph) - course_node = selector.select( - "course", - course_key=six.text_type(course_key) - ).first() - - last_this_command_was_run = None - if course_node: - last_this_command_was_run = course_node['time_last_dumped_to_neo4j'] - - return last_this_command_was_run - - @staticmethod - def get_course_last_published(course_key): - """ - We use the CourseStructure table to get when this course was last - published. - Arguments: - course_key: a CourseKey - - Returns: The datetime the course was last published at, converted into - text, or None, if there's no record of the last time this course - was published. - """ - try: - structure = CourseStructure.objects.get(course_id=course_key) - course_last_published_date = six.text_type(structure.modified) - except CourseStructure.DoesNotExist: - course_last_published_date = None - - return course_last_published_date - - def should_dump_course(self, course_key, graph): - """ - Only dump the course if it's been changed since the last time it's been - dumped. - Arguments: - course_key: a CourseKey object. - graph: a py2neo Graph object. - - Returns: bool of whether this course should be dumped to neo4j. - """ - - last_this_command_was_run = self.get_command_last_run(course_key, graph) - - course_last_published_date = self.get_course_last_published(course_key) - - # if we don't have a record of the last time this command was run, - # we should serialize the course and dump it - if last_this_command_was_run is None: - return True - - # if we've serialized the course recently and we have no published - # events, we will not dump it, and so we can skip serializing it - # again here - if last_this_command_was_run and course_last_published_date is None: - return False - - # otherwise, serialize and dump the course if the command was run - # before the course's last published event - return last_this_command_was_run < course_last_published_date - - def dump_course_to_neo4j(self, course_key, graph): - """ - Serializes a course and writes it to neo4j. - - Arguments: - course_key: course key for the course to be exported - graph: py2neo graph object - """ - - nodes, relationships = self.serialize_course(course_key) - log.info( - "%d nodes and %d relationships in %s", - len(nodes), - len(relationships), - course_key, - ) - - transaction = graph.begin() - course_string = six.text_type(course_key) - try: - # first, delete existing course - transaction.run( - "MATCH (n:item) WHERE n.course_key='{}' DETACH DELETE n".format( - course_string - ) - ) - - # now, re-add it - self.add_to_transaction(nodes, transaction) - self.add_to_transaction(relationships, transaction) - transaction.commit() - - except Exception: # pylint: disable=broad-except - log.exception( - "Error trying to dump course %s to neo4j, rolling back", - course_string - ) - transaction.rollback() - - def dump_courses_to_neo4j(self, graph, override_cache=False): - """ - Method that iterates through a list of courses in a modulestore, - serializes them, then submits tasks to write them to neo4j. - Arguments: - graph: py2neo graph object - override_cache: serialize the courses even if they'be been recently - serialized - - Returns: two lists--one of the courses that were successfully written - to neo4j and one of courses that were not. - """ - - total_number_of_courses = len(self.course_keys) - - submitted_courses = [] - skipped_courses = [] - - for index, course_key in enumerate(self.course_keys): - # first, clear the request cache to prevent memory leaks - RequestCache.clear_request_cache() - - log.info( - "Now exporting %s to neo4j: course %d of %d total courses", - course_key, - index + 1, - total_number_of_courses, - ) - - if not (override_cache or self.should_dump_course(course_key, graph)): - log.info("skipping dumping %s, since it hasn't changed", course_key) - skipped_courses.append(unicode(course_key)) - - else: - self.dump_course_to_neo4j(course_key, graph) - submitted_courses.append(unicode(course_key)) - - return submitted_courses, skipped_courses - class Command(BaseCommand): """ @@ -377,33 +55,11 @@ class Command(BaseCommand): Iterates through each course, serializes them into graphs, and saves those graphs to neo4j. """ - host = options['host'] - https_port = options['https_port'] - http_port = options['http_port'] - secure = options['secure'] - neo4j_user = options['user'] - neo4j_password = options['password'] - - authenticate( - "{host}:{port}".format(host=host, port=https_port if secure else http_port), - neo4j_user, - neo4j_password, - ) - - graph = Graph( - bolt=True, - password=neo4j_password, - user=neo4j_user, - https_port=https_port, - http_port=http_port, - host=host, - secure=secure, - ) mss = ModuleStoreSerializer.create(options['courses'], options['skip']) submitted_courses, skipped_courses = mss.dump_courses_to_neo4j( - graph, override_cache=options['override'] + options, override_cache=options['override'] ) log.info( @@ -413,7 +69,7 @@ class Command(BaseCommand): ) if not submitted_courses: - print("No courses exported to neo4j at all!") + print("No courses submitted for export to neo4j at all!") return if submitted_courses: diff --git a/openedx/core/djangoapps/coursegraph/management/commands/tests/test_dump_to_neo4j.py b/openedx/core/djangoapps/coursegraph/management/commands/tests/test_dump_to_neo4j.py index 1a18fbcd3f..1121d78f07 100644 --- a/openedx/core/djangoapps/coursegraph/management/commands/tests/test_dump_to_neo4j.py +++ b/openedx/core/djangoapps/coursegraph/management/commands/tests/test_dump_to_neo4j.py @@ -14,12 +14,18 @@ from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase from xmodule.modulestore.tests.factories import CourseFactory, ItemFactory from openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j import ( - ModuleStoreSerializer, + ModuleStoreSerializer ) from openedx.core.djangoapps.coursegraph.management.commands.tests.utils import ( MockGraph, MockNodeSelector, ) +from openedx.core.djangoapps.coursegraph.tasks import ( + serialize_item, + serialize_course, + coerce_types, + should_dump_course, +) from openedx.core.djangoapps.content.course_structures.signals import ( listen_for_course_publish ) @@ -109,8 +115,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase): Tests for the dump to neo4j management command """ - @mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') - @mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.Graph') + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector') + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.Graph') @ddt.data(1, 2) def test_dump_specific_courses(self, number_of_courses, mock_graph_class, mock_selector_class): """ @@ -134,8 +140,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase): number_rollbacks=0 ) - @mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') - @mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.Graph') + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector') + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.Graph') def test_dump_skip_course(self, mock_graph_class, mock_selector_class): """ Test that you can skip courses. @@ -160,8 +166,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase): number_rollbacks=0, ) - @mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') - @mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.Graph') + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector') + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.Graph') def test_dump_skip_beats_specifying(self, mock_graph_class, mock_selector_class): """ Test that if you skip and specify the same course, you'll skip it. @@ -187,8 +193,8 @@ class TestDumpToNeo4jCommand(TestDumpToNeo4jCommandBase): number_rollbacks=0, ) - @mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') - @mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.Graph') + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector') + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.Graph') def test_dump_all_courses(self, mock_graph_class, mock_selector_class): """ Test if you don't specify which courses to dump, then you'll dump @@ -229,7 +235,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): """ Tests the serialize_item method. """ - fields, label = self.mss.serialize_item(self.course) + fields, label = serialize_item(self.course) self.assertEqual(label, "course") self.assertIn("edited_on", fields.keys()) self.assertIn("display_name", fields.keys()) @@ -246,7 +252,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): """ Tests the serialize_course method. """ - nodes, relationships = self.mss.serialize_course(self.course.id) + nodes, relationships = serialize_course(self.course.id) self.assertEqual(len(nodes), 9) # the course has 7 "PARENT_OF" relationships and 3 "PRECEDES" self.assertEqual(len(relationships), 10) @@ -282,7 +288,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): Returns: A tuple of the string representations of those XBlocks' locations. """ - return (unicode(xblock1.location), unicode(xblock2.location)) + return (six.text_type(xblock1.location), six.text_type(xblock2.location)) def assertBlockPairIsRelationship(self, xblock1, xblock2, relationships, relationship_type): """ @@ -306,7 +312,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): """ Tests that two nodes that should have a precedes relationship have it. """ - __, relationships = self.mss.serialize_course(self.course.id) + __, relationships = serialize_course(self.course.id) self.assertBlockPairIsRelationship(self.video, self.video2, relationships, "PRECEDES") self.assertBlockPairIsNotRelationship(self.video2, self.video, relationships, "PRECEDES") self.assertBlockPairIsNotRelationship(self.vertical, self.video, relationships, "PRECEDES") @@ -316,7 +322,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): """ Test that two nodes that should have a parent_of relationship have it. """ - __, relationships = self.mss.serialize_course(self.course.id) + __, relationships = serialize_course(self.course.id) self.assertBlockPairIsRelationship(self.vertical, self.video, relationships, "PARENT_OF") self.assertBlockPairIsRelationship(self.vertical, self.html, relationships, "PARENT_OF") self.assertBlockPairIsRelationship(self.course, self.chapter, relationships, "PARENT_OF") @@ -328,7 +334,7 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): """ Test that we add index values on nodes """ - nodes, relationships = self.mss.serialize_course(self.course.id) + nodes, relationships = serialize_course(self.course.id) # the html node should have 0 index, and the problem should have 1 html_nodes = [node for node in nodes if node['block_type'] == 'html'] @@ -359,19 +365,22 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): """ Tests the coerce_types helper """ - coerced_value = self.mss.coerce_types(original_value) + coerced_value = coerce_types(original_value) self.assertEqual(coerced_value, coerced_expected) - @mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') - def test_dump_to_neo4j(self, mock_selector_class): + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector') + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.authenticate_and_create_graph') + def test_dump_to_neo4j(self, mock_graph_constructor, mock_selector_class): """ Tests the dump_to_neo4j method works against a mock py2neo Graph """ mock_graph = MockGraph() + mock_graph_constructor.return_value = mock_graph mock_selector_class.return_value = MockNodeSelector(mock_graph) + mock_credentials = mock.Mock() - submitted, skipped = self.mss.dump_courses_to_neo4j(mock_graph) + submitted, skipped = self.mss.dump_courses_to_neo4j(mock_credentials) self.assertCourseDump( mock_graph, @@ -386,16 +395,19 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): self.assertEqual(len(mock_graph.nodes), 11) self.assertItemsEqual(submitted, self.course_strings) - @mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') - def test_dump_to_neo4j_rollback(self, mock_selector_class): + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector') + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.authenticate_and_create_graph') + def test_dump_to_neo4j_rollback(self, mock_graph_constructor, mock_selector_class): """ Tests that the the dump_to_neo4j method handles the case where there's an exception trying to write to the neo4j database. """ mock_graph = MockGraph(transaction_errors=True) + mock_graph_constructor.return_value = mock_graph mock_selector_class.return_value = MockNodeSelector(mock_graph) + mock_credentials = mock.Mock() - submitted, skipped = self.mss.dump_courses_to_neo4j(mock_graph) + submitted, skipped = self.mss.dump_courses_to_neo4j(mock_credentials) self.assertCourseDump( mock_graph, @@ -406,50 +418,64 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): self.assertItemsEqual(submitted, self.course_strings) - @mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector') + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.authenticate_and_create_graph') @ddt.data((True, 2), (False, 0)) @ddt.unpack - def test_dump_to_neo4j_cache(self, override_cache, expected_number_courses, mock_selector_class): + def test_dump_to_neo4j_cache( + self, + override_cache, + expected_number_courses, + mock_graph_constructor, + mock_selector_class, + ): """ Tests the caching mechanism and override to make sure we only publish recently updated courses. """ mock_graph = MockGraph() + mock_graph_constructor.return_value = mock_graph mock_selector_class.return_value = MockNodeSelector(mock_graph) + mock_credentials = mock.Mock() # run once to warm the cache self.mss.dump_courses_to_neo4j( - mock_graph, override_cache=override_cache + mock_credentials, override_cache=override_cache ) # when run the second time, only dump courses if the cache override # is enabled submitted, __ = self.mss.dump_courses_to_neo4j( - mock_graph, override_cache=override_cache + mock_credentials, override_cache=override_cache ) self.assertEqual(len(submitted), expected_number_courses) - @mock.patch('openedx.core.djangoapps.coursegraph.management.commands.dump_to_neo4j.NodeSelector') - def test_dump_to_neo4j_published(self, mock_selector_class): + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.NodeSelector') + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.authenticate_and_create_graph') + def test_dump_to_neo4j_published(self, mock_graph_constructor, mock_selector_class): """ Tests that we only dump those courses that have been published after the last time the command was been run. """ mock_graph = MockGraph() + mock_graph_constructor.return_value = mock_graph mock_selector_class.return_value = MockNodeSelector(mock_graph) + mock_credentials = mock.Mock() # run once to warm the cache - submitted, skipped = self.mss.dump_courses_to_neo4j(mock_graph) + submitted, skipped = self.mss.dump_courses_to_neo4j(mock_credentials) self.assertEqual(len(submitted), len(self.course_strings)) # simulate one of the courses being published listen_for_course_publish(None, self.course.id) # make sure only the published course was dumped - submitted, __ = self.mss.dump_courses_to_neo4j(mock_graph) + submitted, __ = self.mss.dump_courses_to_neo4j(mock_credentials) self.assertEqual(len(submitted), 1) - self.assertEqual(submitted[0], unicode(self.course.id)) + self.assertEqual(submitted[0], six.text_type(self.course.id)) + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.get_course_last_published') + @mock.patch('openedx.core.djangoapps.coursegraph.tasks.get_command_last_run') @ddt.data( (six.text_type(datetime(2016, 3, 30)), six.text_type(datetime(2016, 3, 31)), True), (six.text_type(datetime(2016, 3, 31)), six.text_type(datetime(2016, 3, 30)), False), @@ -458,17 +484,23 @@ class TestModuleStoreSerializer(TestDumpToNeo4jCommandBase): (None, None, True), ) @ddt.unpack - def test_should_dump_course(self, last_command_run, last_course_published, should_dump): + def test_should_dump_course( + self, + last_command_run, + last_course_published, + should_dump, + mock_get_command_last_run, + mock_get_course_last_published, + ): """ Tests whether a course should be dumped given the last time it was dumped and the last time it was published. """ - mss = ModuleStoreSerializer.create() - mss.get_command_last_run = lambda course_key, graph: last_command_run - mss.get_course_last_published = lambda course_key: last_course_published + mock_get_command_last_run.return_value = last_command_run + mock_get_course_last_published.return_value = last_course_published mock_course_key = mock.Mock mock_graph = mock.Mock() self.assertEqual( - mss.should_dump_course(mock_course_key, mock_graph), + should_dump_course(mock_course_key, mock_graph), should_dump, ) diff --git a/openedx/core/djangoapps/coursegraph/tasks.py b/openedx/core/djangoapps/coursegraph/tasks.py new file mode 100644 index 0000000000..ba5709f2c2 --- /dev/null +++ b/openedx/core/djangoapps/coursegraph/tasks.py @@ -0,0 +1,386 @@ +""" +This file contains a management command for exporting the modulestore to +neo4j, a graph database. +""" +from __future__ import unicode_literals, print_function + +import logging + +from celery import task +from django.conf import settings +from django.utils import six, timezone +from opaque_keys.edx.keys import CourseKey +from py2neo import Graph, Node, Relationship, authenticate, NodeSelector +from py2neo.compat import integer, string, unicode as neo4j_unicode +from request_cache.middleware import RequestCache +from xmodule.modulestore.django import modulestore +from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES + +from openedx.core.djangoapps.content.course_structures.models import CourseStructure + +log = logging.getLogger(__name__) +celery_log = logging.getLogger('edx.celery.task') + +# When testing locally, neo4j's bolt logger was noisy, so we'll only have it +# emit logs if there's an error. +bolt_log = logging.getLogger('neo4j.bolt') # pylint: disable=invalid-name +bolt_log.setLevel(logging.ERROR) + +PRIMITIVE_NEO4J_TYPES = (integer, string, neo4j_unicode, float, bool) + + +def serialize_item(item): + """ + Args: + item: an XBlock + + Returns: + fields: a dictionary of an XBlock's field names and values + block_type: the name of the XBlock's type (i.e. 'course' + or 'problem') + """ + # convert all fields to a dict and filter out parent and children field + fields = dict( + (field, field_value.read_from(item)) + for (field, field_value) in six.iteritems(item.fields) + if field not in ['parent', 'children'] + ) + + course_key = item.scope_ids.usage_id.course_key + block_type = item.scope_ids.block_type + + # set or reset some defaults + fields['edited_on'] = six.text_type(getattr(item, 'edited_on', '')) + fields['display_name'] = item.display_name_with_default + fields['org'] = course_key.org + fields['course'] = course_key.course + fields['run'] = course_key.run + fields['course_key'] = six.text_type(course_key) + fields['location'] = six.text_type(item.location) + fields['block_type'] = block_type + fields['detached'] = block_type in DETACHED_XBLOCK_TYPES + + if block_type == 'course': + # prune the checklists field + if 'checklists' in fields: + del fields['checklists'] + + # record the time this command was run + fields['time_last_dumped_to_neo4j'] = six.text_type(timezone.now()) + + return fields, block_type + + +def coerce_types(value): + """ + Args: + value: the value of an xblock's field + + Returns: either the value, a text version of the value, or, if the + value is a list, a list where each element is converted to text. + """ + coerced_value = value + if isinstance(value, list): + coerced_value = [six.text_type(element) for element in coerced_value] + + # if it's not one of the types that neo4j accepts, + # just convert it to text + elif not isinstance(value, PRIMITIVE_NEO4J_TYPES): + coerced_value = six.text_type(value) + + return coerced_value + + +def add_to_transaction(neo4j_entities, transaction): + """ + Args: + neo4j_entities: a list of Nodes or Relationships + transaction: a neo4j transaction + """ + for entity in neo4j_entities: + transaction.create(entity) + + +def get_command_last_run(course_key, graph): + """ + This information is stored on the course node of a course in neo4j + Args: + course_key: a CourseKey + graph: a py2neo Graph + + Returns: The datetime that the command was last run, converted into + text, or None, if there's no record of this command last being run. + """ + selector = NodeSelector(graph) + course_node = selector.select( + "course", + course_key=six.text_type(course_key) + ).first() + + last_this_command_was_run = None + if course_node: + last_this_command_was_run = course_node['time_last_dumped_to_neo4j'] + + return last_this_command_was_run + + +def get_course_last_published(course_key): + """ + We use the CourseStructure table to get when this course was last + published. + Args: + course_key: a CourseKey + + Returns: The datetime the course was last published at, converted into + text, or None, if there's no record of the last time this course + was published. + """ + try: + structure = CourseStructure.objects.get(course_id=course_key) + course_last_published_date = six.text_type(structure.modified) + except CourseStructure.DoesNotExist: + course_last_published_date = None + + return course_last_published_date + + +def serialize_course(course_id): + """ + Serializes a course into py2neo Nodes and Relationships + Args: + course_id: CourseKey of the course we want to serialize + + Returns: + nodes: a list of py2neo Node objects + relationships: a list of py2neo Relationships objects + """ + # create a location to node mapping we'll need later for + # writing relationships + location_to_node = {} + items = modulestore().get_items(course_id) + + # create nodes + for item in items: + fields, block_type = serialize_item(item) + + for field_name, value in six.iteritems(fields): + fields[field_name] = coerce_types(value) + + node = Node(block_type, 'item', **fields) + location_to_node[item.location] = node + + # create relationships + relationships = [] + for item in items: + previous_child_node = None + for index, child_loc in enumerate(item.get_children()): + parent_node = location_to_node.get(item.location) + child_node = location_to_node.get(child_loc.location) + child_node["index"] = index + if parent_node is not None and child_node is not None: + relationship = Relationship(parent_node, "PARENT_OF", child_node) + relationships.append(relationship) + + if previous_child_node: + ordering_relationship = Relationship( + previous_child_node, + "PRECEDES", + child_node, + ) + relationships.append(ordering_relationship) + previous_child_node = child_node + + nodes = location_to_node.values() + return nodes, relationships + + +def should_dump_course(course_key, graph): + """ + Only dump the course if it's been changed since the last time it's been + dumped. + Args: + course_key: a CourseKey object. + graph: a py2neo Graph object. + + Returns: bool of whether this course should be dumped to neo4j. + """ + + last_this_command_was_run = get_command_last_run(course_key, graph) + + course_last_published_date = get_course_last_published(course_key) + + # if we don't have a record of the last time this command was run, + # we should serialize the course and dump it + if last_this_command_was_run is None: + return True + + # if we've serialized the course recently and we have no published + # events, we will not dump it, and so we can skip serializing it + # again here + if last_this_command_was_run and course_last_published_date is None: + return False + + # otherwise, serialize and dump the course if the command was run + # before the course's last published event + return last_this_command_was_run < course_last_published_date + + +@task +def dump_course_to_neo4j(course_key_string, credentials): + """ + Serializes a course and writes it to neo4j. + + Arguments: + course_key: course key for the course to be exported + credentials (dict): the necessary credentials to connect + to neo4j and create a py2neo `Graph` obje + """ + course_key = CourseKey.from_string(course_key_string) + nodes, relationships = serialize_course(course_key) + celery_log.info( + "Now dumping %s to neo4j: %d nodes and %d relationships", + course_key, + len(nodes), + len(relationships), + ) + + graph = authenticate_and_create_graph(credentials) + + transaction = graph.begin() + course_string = six.text_type(course_key) + try: + # first, delete existing course + transaction.run( + "MATCH (n:item) WHERE n.course_key='{}' DETACH DELETE n".format( + course_string + ) + ) + + # now, re-add it + add_to_transaction(nodes, transaction) + add_to_transaction(relationships, transaction) + transaction.commit() + celery_log.info("Completed dumping %s to neo4j", course_key) + + except Exception: # pylint: disable=broad-except + celery_log.exception( + "Error trying to dump course %s to neo4j, rolling back", + course_string + ) + transaction.rollback() + + +class ModuleStoreSerializer(object): + """ + Class with functionality to serialize a modulestore into subgraphs, + one graph per course. + """ + + def __init__(self, course_keys): + self.course_keys = course_keys + + @classmethod + def create(cls, courses=None, skip=None): + """ + Sets the object's course_keys attribute from the `courses` parameter. + If that parameter isn't furnished, loads all course_keys from the + modulestore. + Filters out course_keys in the `skip` parameter, if provided. + Args: + courses: A list of string serializations of course keys. + For example, ["course-v1:org+course+run"]. + skip: Also a list of string serializations of course keys. + """ + if courses: + course_keys = [CourseKey.from_string(course.strip()) for course in courses] + else: + course_keys = [ + course.id for course in modulestore().get_course_summaries() + ] + if skip is not None: + skip_keys = [CourseKey.from_string(course.strip()) for course in skip] + course_keys = [course_key for course_key in course_keys if course_key not in skip_keys] + return cls(course_keys) + + def dump_courses_to_neo4j(self, credentials, override_cache=False): + """ + Method that iterates through a list of courses in a modulestore, + serializes them, then submits tasks to write them to neo4j. + Arguments: + credentials (dict): the necessary credentials to connect + to neo4j and create a py2neo `Graph` object + override_cache: serialize the courses even if they'be been recently + serialized + + Returns: two lists--one of the courses that were successfully written + to neo4j and one of courses that were not. + """ + + total_number_of_courses = len(self.course_keys) + + submitted_courses = [] + skipped_courses = [] + + graph = authenticate_and_create_graph(credentials) + + for index, course_key in enumerate(self.course_keys): + # first, clear the request cache to prevent memory leaks + RequestCache.clear_request_cache() + + log.info( + "Now submitting %s for export to neo4j: course %d of %d total courses", + course_key, + index + 1, + total_number_of_courses, + ) + + if not (override_cache or should_dump_course(course_key, graph)): + log.info("skipping submitting %s, since it hasn't changed", course_key) + skipped_courses.append(six.text_type(course_key)) + continue + + dump_course_to_neo4j.apply_async( + args=[six.text_type(course_key), credentials], + routing_key=settings.COURSEGRAPH_JOB_QUEUE, + ) + submitted_courses.append(six.text_type(course_key)) + + return submitted_courses, skipped_courses + + +def authenticate_and_create_graph(credentials): + """ + This function authenticates with neo4j and creates a py2neo graph object + Arguments: + credentials (dict): a dictionary of credentials used to authenticate, + and then create, a py2neo graph object. + + Returns: a py2neo `Graph` object. + """ + + host = credentials['host'] + https_port = credentials['https_port'] + http_port = credentials['http_port'] + secure = credentials['secure'] + neo4j_user = credentials['user'] + neo4j_password = credentials['password'] + + authenticate( + "{host}:{port}".format( + host=host, port=https_port if secure else http_port + ), + neo4j_user, + neo4j_password, + ) + + graph = Graph( + bolt=True, + password=neo4j_password, + user=neo4j_user, + https_port=https_port, + http_port=http_port, + host=host, + secure=secure, + ) + + return graph