From 74e70119a23fa3ffb3db19d4590eccfebd72b659 Mon Sep 17 00:00:00 2001 From: John Eskew Date: Fri, 4 Dec 2015 17:39:27 -0500 Subject: [PATCH] Use a common MongoDB connection function. Add some missing @autoretry_read() decorators. Change to PyMongo 3.x-compatible syntax. --- .../lib/xmodule/xmodule/contentstore/mongo.py | 64 ++++++++++--------- .../xmodule/xmodule/modulestore/mongo/base.py | 37 ++++------- .../split_mongo/mongo_connection.py | 36 +++-------- .../lib/xmodule/xmodule/mongo_connection.py | 53 +++++++++++++++ 4 files changed, 111 insertions(+), 79 deletions(-) create mode 100644 common/lib/xmodule/xmodule/mongo_connection.py diff --git a/common/lib/xmodule/xmodule/contentstore/mongo.py b/common/lib/xmodule/xmodule/contentstore/mongo.py index 6779f246d2..56ccd039cc 100644 --- a/common/lib/xmodule/xmodule/contentstore/mongo.py +++ b/common/lib/xmodule/xmodule/contentstore/mongo.py @@ -1,52 +1,50 @@ +""" +MongoDB/GridFS-level code for the contentstore. +""" +import os +import json import pymongo import gridfs from gridfs.errors import NoFile - -from xmodule.contentstore.content import XASSET_LOCATION_TAG - -import logging - -from .content import StaticContent, ContentStore, StaticContentStream -from xmodule.exceptions import NotFoundError from fs.osfs import OSFS -import os -import json from bson.son import SON + +from mongodb_proxy import autoretry_read from opaque_keys.edx.keys import AssetKey +from xmodule.contentstore.content import XASSET_LOCATION_TAG +from xmodule.exceptions import NotFoundError from xmodule.modulestore.django import ASSET_IGNORE_REGEX from xmodule.util.misc import escape_invalid_characters +from xmodule.mongo_connection import connect_to_mongodb +from .content import StaticContent, ContentStore, StaticContentStream class MongoContentStore(ContentStore): - - # pylint: disable=unused-argument - def __init__(self, host, db, port=27017, user=None, password=None, bucket='fs', collection=None, **kwargs): + """ + MongoDB-backed ContentStore. + """ + # pylint: disable=unused-argument, bad-continuation + def __init__( + self, host, db, + port=27017, tz_aware=True, user=None, password=None, bucket='fs', collection=None, **kwargs + ): """ Establish the connection with the mongo backend and connect to the collections :param collection: ignores but provided for consistency w/ other doc_store_config patterns """ - logging.debug('Using MongoDB for static content serving at host={0} port={1} db={2}'.format(host, port, db)) - - # Remove the replicaSet parameter. - kwargs.pop('replicaSet', None) - - _db = pymongo.database.Database( - pymongo.MongoClient( - host=host, - port=port, - document_class=dict, - **kwargs - ), - db + # GridFS will throw an exception if the Database is wrapped in a MongoProxy. So don't wrap it. + # The appropriate methods below are marked as autoretry_read - those methods will handle + # the AutoReconnect errors. + proxy = False + mongo_db = connect_to_mongodb( + db, host, + port=port, tz_aware=tz_aware, user=user, password=password, proxy=proxy, **kwargs ) - if user is not None and password is not None: - _db.authenticate(user, password) + self.fs = gridfs.GridFS(mongo_db, bucket) # pylint: disable=invalid-name - self.fs = gridfs.GridFS(_db, bucket) - - self.fs_files = _db[bucket + ".files"] # the underlying collection GridFS uses + self.fs_files = mongo_db[bucket + ".files"] # the underlying collection GridFS uses def close_connections(self): """ @@ -86,11 +84,15 @@ class MongoContentStore(ContentStore): return content def delete(self, location_or_id): + """ + Delete an asset. + """ if isinstance(location_or_id, AssetKey): location_or_id, _ = self.asset_db_key(location_or_id) # Deletes of non-existent files are considered successful self.fs.delete(location_or_id) + @autoretry_read() def find(self, location, throw_on_not_found=True, as_stream=False): content_id, __ = self.asset_db_key(location) @@ -206,6 +208,7 @@ class MongoContentStore(ContentStore): self.fs_files.remove(query) return assets_to_delete + @autoretry_read() def _get_all_content_for_course(self, course_key, get_thumbnails=False, @@ -288,6 +291,7 @@ class MongoContentStore(ContentStore): if not result.get('updatedExisting', True): raise NotFoundError(asset_db_key) + @autoretry_read() def get_attrs(self, location): """ Gets all of the attributes associated with the given asset. Note, returns even built in attrs diff --git a/common/lib/xmodule/xmodule/modulestore/mongo/base.py b/common/lib/xmodule/xmodule/modulestore/mongo/base.py index df150cb3b6..4bcdfa15a8 100644 --- a/common/lib/xmodule/xmodule/modulestore/mongo/base.py +++ b/common/lib/xmodule/xmodule/modulestore/mongo/base.py @@ -22,7 +22,7 @@ from uuid import uuid4 from bson.son import SON from datetime import datetime from fs.osfs import OSFS -from mongodb_proxy import MongoProxy, autoretry_read +from mongodb_proxy import autoretry_read from path import Path as path from pytz import UTC from contracts import contract, new_contract @@ -43,6 +43,7 @@ from xmodule.error_module import ErrorDescriptor from xmodule.errortracker import null_error_tracker, exc_info_to_str from xmodule.exceptions import HeartbeatFailure from xmodule.mako_module import MakoDescriptorSystem +from xmodule.mongo_connection import connect_to_mongodb from xmodule.modulestore import ModuleStoreWriteBase, ModuleStoreEnum, BulkOperationsMixin, BulkOpsRecord from xmodule.modulestore.draft_and_published import ModuleStoreDraftAndPublished, DIRECT_ONLY_CATEGORIES from xmodule.modulestore.edit_info import EditInfoRuntimeMixin @@ -558,22 +559,16 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo """ Create & open the connection, authenticate, and provide pointers to the collection """ - # Remove the replicaSet parameter. - kwargs.pop('replicaSet', None) + # Set a write concern of 1, which makes writes complete successfully to the primary + # only before returning. Also makes pymongo report write errors. + kwargs['w'] = 1 - self.database = MongoProxy( - pymongo.database.Database( - pymongo.MongoClient( - host=host, - port=port, - tz_aware=tz_aware, - document_class=dict, - **kwargs - ), - db - ), - wait_time=retry_wait_time + self.database = connect_to_mongodb( + db, host, + port=port, tz_aware=tz_aware, user=user, password=password, + retry_wait_time=retry_wait_time, **kwargs ) + self.collection = self.database[collection] # Collection which stores asset metadata. @@ -581,14 +576,8 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo asset_collection = self.DEFAULT_ASSET_COLLECTION_NAME self.asset_collection = self.database[asset_collection] - if user is not None and password is not None: - self.database.authenticate(user, password) - do_connection(**doc_store_config) - # Force mongo to report errors, at the expense of performance - self.collection.write_concern = {'w': 1} - if default_class is not None: module_path, _, class_name = default_class.rpartition('.') class_ = getattr(import_module(module_path), class_name) @@ -1012,6 +1001,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo ) return [course for course in base_list if not isinstance(course, ErrorDescriptor)] + @autoretry_read() def _find_one(self, location): '''Look for a given location in the collection. If the item is not present, raise ItemNotFoundError. @@ -1052,6 +1042,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo except ItemNotFoundError: return None + @autoretry_read() def has_course(self, course_key, ignore_case=False, **kwargs): """ Returns the course_id of the course if it was found, else None @@ -1073,7 +1064,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo course_query[key] = re.compile(r"(?i)^{}$".format(course_query[key])) else: course_query = {'_id': location.to_deprecated_son()} - course = self.collection.find_one(course_query, fields={'_id': True}) + course = self.collection.find_one(course_query, projection={'_id': True}) if course: return SlashSeparatedCourseKey(course['_id']['org'], course['_id']['course'], course['_id']['name']) else: @@ -1234,7 +1225,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo ('_id.course', re.compile(u'^{}$'.format(course_id.course), re.IGNORECASE)), ('_id.category', 'course'), ]) - courses = self.collection.find(course_search_location, fields=('_id')) + courses = self.collection.find(course_search_location, projection={'_id': True}) if courses.count() > 0: raise DuplicateCourseError(course_id, courses[0]['_id']) diff --git a/common/lib/xmodule/xmodule/modulestore/split_mongo/mongo_connection.py b/common/lib/xmodule/xmodule/modulestore/split_mongo/mongo_connection.py index 81dade550e..1352ee95cd 100644 --- a/common/lib/xmodule/xmodule/modulestore/split_mongo/mongo_connection.py +++ b/common/lib/xmodule/xmodule/modulestore/split_mongo/mongo_connection.py @@ -23,10 +23,11 @@ except ImportError: import dogstats_wrapper as dog_stats_api from contracts import check, new_contract -from mongodb_proxy import autoretry_read, MongoProxy +from mongodb_proxy import autoretry_read from xmodule.exceptions import HeartbeatFailure from xmodule.modulestore import BlockData from xmodule.modulestore.split_mongo import BlockKey +from xmodule.mongo_connection import connect_to_mongodb new_contract('BlockData', BlockData) @@ -287,37 +288,20 @@ class MongoConnection(object): """ Create & open the connection, authenticate, and provide pointers to the collections """ - if kwargs.get('replicaSet') is None: - kwargs.pop('replicaSet', None) - mongo_class = pymongo.MongoClient - else: - mongo_class = pymongo.MongoReplicaSetClient - _client = mongo_class( - host=host, - port=port, - tz_aware=tz_aware, - **kwargs - ) - self.database = MongoProxy( - pymongo.database.Database(_client, db), - wait_time=retry_wait_time - ) + # Set a write concern of 1, which makes writes complete successfully to the primary + # only before returning. Also makes pymongo report write errors. + kwargs['w'] = 1 - if user is not None and password is not None: - self.database.authenticate(user, password) + self.database = connect_to_mongodb( + db, host, + port=port, tz_aware=tz_aware, user=user, password=password, + retry_wait_time=retry_wait_time, **kwargs + ) self.course_index = self.database[collection + '.active_versions'] self.structures = self.database[collection + '.structures'] self.definitions = self.database[collection + '.definitions'] - # every app has write access to the db (v having a flag to indicate r/o v write) - # Force mongo to report errors, at the expense of performance - # pymongo docs suck but explanation: - # http://api.mongodb.org/java/2.10.1/com/mongodb/WriteConcern.html - self.course_index.write_concern = {'w': 1} - self.structures.write_concern = {'w': 1} - self.definitions.write_concern = {'w': 1} - def heartbeat(self): """ Check that the db is reachable. diff --git a/common/lib/xmodule/xmodule/mongo_connection.py b/common/lib/xmodule/xmodule/mongo_connection.py new file mode 100644 index 0000000000..ea353a60b6 --- /dev/null +++ b/common/lib/xmodule/xmodule/mongo_connection.py @@ -0,0 +1,53 @@ +""" +Common MongoDB connection functions. +""" +import pymongo +from mongodb_proxy import MongoProxy + + +# pylint: disable=bad-continuation +def connect_to_mongodb( + db, host, + port=27017, tz_aware=True, user=None, password=None, + retry_wait_time=0.1, proxy=True, **kwargs +): + """ + Returns a MongoDB Database connection, optionally wrapped in a proxy. The proxy + handles AutoReconnect errors by retrying read operations, since these exceptions + typically indicate a temporary step-down condition for MongoDB. + """ + # The MongoReplicaSetClient class is deprecated in Mongo 3.x, in favor of using + # the MongoClient class for all connections. Update/simplify this code when using + # PyMongo 3.x. + if kwargs.get('replicaSet'): + # Enable reading from secondary nodes in the MongoDB replicaset by using the + # MongoReplicaSetClient class. + # The 'replicaSet' parameter in kwargs is required for secondary reads. + # The read_preference should be set to a proper value, like SECONDARY_PREFERRED. + mongo_client_class = pymongo.MongoReplicaSetClient + else: + # No 'replicaSet' in kwargs - so no secondary reads. + mongo_client_class = pymongo.MongoClient + + mongo_conn = pymongo.database.Database( + mongo_client_class( + host=host, + port=port, + tz_aware=tz_aware, + document_class=dict, + **kwargs + ), + db + ) + + if proxy: + mongo_conn = MongoProxy( + mongo_conn, + wait_time=retry_wait_time + ) + + # If credentials were provided, authenticate the user. + if user is not None and password is not None: + mongo_conn.authenticate(user, password) + + return mongo_conn