Merge pull request #5816 from edx/usman/plat71-mongo-autoreconnect-errors-2
Proxy to retry on mongo Autoreconnect errors.
This commit is contained in:
@@ -20,34 +20,37 @@ import re
|
||||
from uuid import uuid4
|
||||
|
||||
from bson.son import SON
|
||||
from fs.osfs import OSFS
|
||||
from path import path
|
||||
from contracts import contract, new_contract
|
||||
from datetime import datetime
|
||||
from fs.osfs import OSFS
|
||||
from mongodb_proxy import MongoProxy, autoretry_read
|
||||
from path import path
|
||||
from pytz import UTC
|
||||
from contracts import contract, new_contract
|
||||
from operator import itemgetter
|
||||
from sortedcontainers import SortedListWithKey
|
||||
|
||||
from importlib import import_module
|
||||
from xmodule.errortracker import null_error_tracker, exc_info_to_str
|
||||
from xmodule.mako_module import MakoDescriptorSystem
|
||||
from xmodule.error_module import ErrorDescriptor
|
||||
from xblock.runtime import KvsFieldData
|
||||
from xblock.exceptions import InvalidScopeError
|
||||
from xblock.fields import Scope, ScopeIds, Reference, ReferenceList, ReferenceValueDict
|
||||
|
||||
from xmodule.modulestore import ModuleStoreWriteBase, ModuleStoreEnum, BulkOperationsMixin, BulkOpsRecord
|
||||
from xmodule.modulestore.draft_and_published import ModuleStoreDraftAndPublished, DIRECT_ONLY_CATEGORIES
|
||||
from opaque_keys.edx.keys import UsageKey, CourseKey, AssetKey
|
||||
from opaque_keys.edx.locations import Location
|
||||
from xmodule.modulestore.exceptions import ItemNotFoundError, DuplicateCourseError, ReferentialIntegrityError
|
||||
from xmodule.modulestore.inheritance import InheritanceMixin, inherit_metadata, InheritanceKeyValueStore
|
||||
from xblock.core import XBlock
|
||||
from opaque_keys.edx.locations import SlashSeparatedCourseKey
|
||||
from opaque_keys.edx.locator import CourseLocator
|
||||
from opaque_keys.edx.keys import UsageKey, CourseKey, AssetKey
|
||||
from xmodule.exceptions import HeartbeatFailure
|
||||
from xmodule.modulestore.edit_info import EditInfoRuntimeMixin
|
||||
|
||||
from xblock.core import XBlock
|
||||
from xblock.exceptions import InvalidScopeError
|
||||
from xblock.fields import Scope, ScopeIds, Reference, ReferenceList, ReferenceValueDict
|
||||
from xblock.runtime import KvsFieldData
|
||||
|
||||
from xmodule.assetstore import AssetMetadata
|
||||
from xmodule.error_module import ErrorDescriptor
|
||||
from xmodule.errortracker import null_error_tracker, exc_info_to_str
|
||||
from xmodule.exceptions import HeartbeatFailure
|
||||
from xmodule.mako_module import MakoDescriptorSystem
|
||||
from xmodule.modulestore import ModuleStoreWriteBase, ModuleStoreEnum, BulkOperationsMixin, BulkOpsRecord
|
||||
from xmodule.modulestore.draft_and_published import ModuleStoreDraftAndPublished, DIRECT_ONLY_CATEGORIES
|
||||
from xmodule.modulestore.edit_info import EditInfoRuntimeMixin
|
||||
from xmodule.modulestore.exceptions import ItemNotFoundError, DuplicateCourseError, ReferentialIntegrityError
|
||||
from xmodule.modulestore.inheritance import InheritanceMixin, inherit_metadata, InheritanceKeyValueStore
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -442,6 +445,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo
|
||||
error_tracker=null_error_tracker,
|
||||
i18n_service=None,
|
||||
fs_service=None,
|
||||
retry_wait_time=0.1,
|
||||
**kwargs):
|
||||
"""
|
||||
:param doc_store_config: must have a host, db, and collection entries. Other common entries: port, tz_aware.
|
||||
@@ -455,15 +459,18 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo
|
||||
"""
|
||||
Create & open the connection, authenticate, and provide pointers to the collection
|
||||
"""
|
||||
self.database = pymongo.database.Database(
|
||||
pymongo.MongoClient(
|
||||
host=host,
|
||||
port=port,
|
||||
tz_aware=tz_aware,
|
||||
document_class=dict,
|
||||
**kwargs
|
||||
self.database = MongoProxy(
|
||||
pymongo.database.Database(
|
||||
pymongo.MongoClient(
|
||||
host=host,
|
||||
port=port,
|
||||
tz_aware=tz_aware,
|
||||
document_class=dict,
|
||||
**kwargs
|
||||
),
|
||||
db
|
||||
),
|
||||
db
|
||||
wait_time=retry_wait_time
|
||||
)
|
||||
self.collection = self.database[collection]
|
||||
|
||||
@@ -516,9 +523,10 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo
|
||||
super(MongoModuleStore, self)._drop_database()
|
||||
|
||||
connection = self.collection.database.connection
|
||||
connection.drop_database(self.collection.database)
|
||||
connection.drop_database(self.collection.database.proxied_object)
|
||||
connection.close()
|
||||
|
||||
@autoretry_read()
|
||||
def fill_in_run(self, course_key):
|
||||
"""
|
||||
In mongo some course_keys are used without runs. This helper function returns
|
||||
@@ -692,6 +700,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo
|
||||
item['location'] = item['_id']
|
||||
del item['_id']
|
||||
|
||||
@autoretry_read()
|
||||
def _query_children_for_cache_children(self, course_key, items):
|
||||
"""
|
||||
Generate a pymongo in query for finding the items and return the payloads
|
||||
@@ -796,6 +805,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo
|
||||
for item in items
|
||||
]
|
||||
|
||||
@autoretry_read()
|
||||
def get_courses(self, **kwargs):
|
||||
'''
|
||||
Returns a list of course descriptors.
|
||||
@@ -927,6 +937,7 @@ class MongoModuleStore(ModuleStoreDraftAndPublished, ModuleStoreWriteBase, Mongo
|
||||
for key in ('tag', 'org', 'course', 'category', 'name', 'revision')
|
||||
])
|
||||
|
||||
@autoretry_read()
|
||||
def get_items(
|
||||
self,
|
||||
course_id,
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
Segregation of pymongo functions from the data modeling mechanisms for split modulestore.
|
||||
"""
|
||||
import re
|
||||
from mongodb_proxy import autoretry_read, MongoProxy
|
||||
import pymongo
|
||||
import time
|
||||
|
||||
@@ -68,50 +69,28 @@ def structure_to_mongo(structure):
|
||||
return new_structure
|
||||
|
||||
|
||||
def autoretry_read(wait=0.1, retries=5):
|
||||
"""
|
||||
Automatically retry a read-only method in the case of a pymongo
|
||||
AutoReconnect exception.
|
||||
|
||||
See http://emptysqua.re/blog/save-the-monkey-reliably-writing-to-mongodb/
|
||||
for a discussion of this technique.
|
||||
"""
|
||||
def decorate(fn):
|
||||
@wraps(fn)
|
||||
def wrapper(*args, **kwargs):
|
||||
for attempt in xrange(retries):
|
||||
try:
|
||||
return fn(*args, **kwargs)
|
||||
break
|
||||
except AutoReconnect:
|
||||
# Reraise if we failed on our last attempt
|
||||
if attempt == retries - 1:
|
||||
raise
|
||||
|
||||
if wait:
|
||||
time.sleep(wait)
|
||||
return wrapper
|
||||
return decorate
|
||||
|
||||
|
||||
class MongoConnection(object):
|
||||
"""
|
||||
Segregation of pymongo functions from the data modeling mechanisms for split modulestore.
|
||||
"""
|
||||
def __init__(
|
||||
self, db, collection, host, port=27017, tz_aware=True, user=None, password=None, asset_collection=None, **kwargs
|
||||
self, db, collection, host, port=27017, tz_aware=True, user=None, password=None,
|
||||
asset_collection=None, retry_wait_time=0.1, **kwargs
|
||||
):
|
||||
"""
|
||||
Create & open the connection, authenticate, and provide pointers to the collections
|
||||
"""
|
||||
self.database = pymongo.database.Database(
|
||||
pymongo.MongoClient(
|
||||
host=host,
|
||||
port=port,
|
||||
tz_aware=tz_aware,
|
||||
**kwargs
|
||||
self.database = MongoProxy(
|
||||
pymongo.database.Database(
|
||||
pymongo.MongoClient(
|
||||
host=host,
|
||||
port=port,
|
||||
tz_aware=tz_aware,
|
||||
**kwargs
|
||||
),
|
||||
db
|
||||
),
|
||||
db
|
||||
wait_time=retry_wait_time
|
||||
)
|
||||
|
||||
# Remove when adding official Split support for asset metadata storage.
|
||||
@@ -142,7 +121,6 @@ class MongoConnection(object):
|
||||
else:
|
||||
raise HeartbeatFailure("Can't connect to {}".format(self.database.name))
|
||||
|
||||
@autoretry_read()
|
||||
def get_structure(self, key):
|
||||
"""
|
||||
Get the structure from the persistence mechanism whose id is the given key
|
||||
@@ -195,7 +173,6 @@ class MongoConnection(object):
|
||||
"""
|
||||
self.structures.insert(structure_to_mongo(structure))
|
||||
|
||||
@autoretry_read()
|
||||
def get_course_index(self, key, ignore_case=False):
|
||||
"""
|
||||
Get the course_index from the persistence mechanism whose id is the given key
|
||||
@@ -212,7 +189,6 @@ class MongoConnection(object):
|
||||
}
|
||||
return self.course_index.find_one(query)
|
||||
|
||||
@autoretry_read()
|
||||
def find_matching_course_indexes(self, branch=None, search_targets=None):
|
||||
"""
|
||||
Find the course_index matching particular conditions.
|
||||
@@ -271,14 +247,12 @@ class MongoConnection(object):
|
||||
'run': course_index['run'],
|
||||
})
|
||||
|
||||
@autoretry_read()
|
||||
def get_definition(self, key):
|
||||
"""
|
||||
Get the definition from the persistence mechanism whose id is the given key
|
||||
"""
|
||||
return self.definitions.find_one({'_id': key})
|
||||
|
||||
@autoretry_read()
|
||||
def get_definitions(self, definitions):
|
||||
"""
|
||||
Retrieve all definitions listed in `definitions`.
|
||||
|
||||
@@ -56,6 +56,7 @@ import datetime
|
||||
import logging
|
||||
from contracts import contract, new_contract
|
||||
from importlib import import_module
|
||||
from mongodb_proxy import autoretry_read
|
||||
from path import path
|
||||
from pytz import UTC
|
||||
from bson.objectid import ObjectId
|
||||
@@ -775,6 +776,7 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
|
||||
# add it in the envelope for the structure.
|
||||
return CourseEnvelope(course_key.replace(version_guid=version_guid), entry)
|
||||
|
||||
@autoretry_read()
|
||||
def get_courses(self, branch, **kwargs):
|
||||
'''
|
||||
Returns a list of course descriptors matching any given qualifiers.
|
||||
@@ -2631,6 +2633,7 @@ class SplitMongoModuleStore(SplitBulkWriteMixin, ModuleStoreWriteBase):
|
||||
"""
|
||||
structure['blocks'][block_key] = content
|
||||
|
||||
@autoretry_read()
|
||||
def find_courses_by_search_target(self, field_name, field_value):
|
||||
"""
|
||||
Find all the courses which cached that they have the given field with the given value.
|
||||
|
||||
@@ -70,9 +70,9 @@ class SplitWMongoCourseBoostrapper(unittest.TestCase):
|
||||
Remove the test collections, close the db connection
|
||||
"""
|
||||
split_db = self.split_mongo.db
|
||||
split_db.drop_collection(split_db.course_index)
|
||||
split_db.drop_collection(split_db.structures)
|
||||
split_db.drop_collection(split_db.definitions)
|
||||
split_db.drop_collection(split_db.course_index.proxied_object)
|
||||
split_db.drop_collection(split_db.structures.proxied_object)
|
||||
split_db.drop_collection(split_db.definitions.proxied_object)
|
||||
|
||||
def tear_down_mongo(self):
|
||||
"""
|
||||
@@ -80,7 +80,7 @@ class SplitWMongoCourseBoostrapper(unittest.TestCase):
|
||||
"""
|
||||
split_db = self.split_mongo.db
|
||||
# old_mongo doesn't give a db attr, but all of the dbs are the same
|
||||
split_db.drop_collection(self.draft_mongo.collection)
|
||||
split_db.drop_collection(self.draft_mongo.collection.proxied_object)
|
||||
|
||||
def _create_item(self, category, name, data, metadata, parent_category, parent_name, draft=True, split=True):
|
||||
"""
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
-e git+https://github.com/edx/django-pipeline.git@88ec8a011e481918fdc9d2682d4017c835acd8be#egg=django-pipeline
|
||||
-e git+https://github.com/edx/django-wiki.git@cd0b2b31997afccde519fe5b3365e61a9edb143f#egg=django-wiki
|
||||
-e git+https://github.com/edx/django-oauth2-provider.git@0.2.7-fork-edx-2#egg=django-oauth2-provider
|
||||
-e git+https://github.com/edx/MongoDBProxy.git@efe14679c9263ab491916ed960f5930127e05faf#egg=mongodb_proxy
|
||||
-e git+https://github.com/gabrielfalcao/lettuce.git@cccc3978ad2df82a78b6f9648fe2e9baddd22f88#egg=lettuce
|
||||
-e git+https://github.com/dementrock/pystache_custom.git@776973740bdaad83a3b029f96e415a7d1e8bec2f#egg=pystache_custom-dev
|
||||
-e git+https://github.com/eventbrite/zendesk.git@d53fe0e81b623f084e91776bcf6369f8b7b63879#egg=zendesk
|
||||
|
||||
Reference in New Issue
Block a user