using generators instead of list for sync_hubspot_contact command to
handle OOM issues
This commit is contained in:
@@ -3,9 +3,10 @@ Management command to sync platform users with hubspot
|
||||
./manage.py lms sync_hubspot_contacts
|
||||
./manage.py lms sync_hubspot_contacts --initial-sync-days=7 --batch-size=20
|
||||
"""
|
||||
|
||||
import itertools
|
||||
import json
|
||||
import traceback
|
||||
import urlparse
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
@@ -47,7 +48,7 @@ class Command(BaseCommand):
|
||||
"""
|
||||
api_key = site_conf.get_value('HUBSPOT_API_KEY')
|
||||
last_contact_email = None
|
||||
client = EdxRestApiClient('/'.join([HUBSPOT_API_BASE_URL, 'contacts/v1/lists/all/contacts']))
|
||||
client = EdxRestApiClient(urlparse.urljoin(HUBSPOT_API_BASE_URL, 'contacts/v1/lists/all/contacts'))
|
||||
try:
|
||||
response = client.recent.get(hapikey=api_key, count=1, property='email')
|
||||
if 'contacts' in response:
|
||||
@@ -55,7 +56,7 @@ class Command(BaseCommand):
|
||||
last_contact_email = contact.get('properties').get('email').get('value')
|
||||
|
||||
except (HttpClientError, HttpServerError) as ex:
|
||||
message = "An error occurred while getting recent contact for site {domain}, {message}".format(
|
||||
message = u'An error occurred while getting recent contact for site {domain}, {message}'.format(
|
||||
domain=site_conf.site.domain, message=ex.message
|
||||
)
|
||||
self.stderr.write(message)
|
||||
@@ -72,22 +73,25 @@ class Command(BaseCommand):
|
||||
|
||||
"""
|
||||
if last_synced_user:
|
||||
self.stdout.write(
|
||||
u'Started pulling unsynced contacts for site {site} created after user {user}'.format(
|
||||
site=site_domain, user=last_synced_user
|
||||
)
|
||||
)
|
||||
users = User.objects.select_related('profile').filter(id__gt=last_synced_user.id).order_by('pk')
|
||||
else:
|
||||
# If we don't have last synced user get all users who joined on between today and threshold days ago
|
||||
start_date = datetime.now().date() - timedelta(days_threshold)
|
||||
self.stdout.write(
|
||||
'Started pulling unsynced contacts for site {site} from {start_date}'.format(
|
||||
u'Started pulling unsynced contacts for site {site} from {start_date}'.format(
|
||||
site=site_domain, start_date=start_date
|
||||
)
|
||||
)
|
||||
users = User.objects.select_related('profile').filter(date_joined__date__gte=start_date).order_by('pk')
|
||||
|
||||
unsynced_users = [
|
||||
user for user in use_read_replica_if_available(users)
|
||||
if UserAttribute.get_user_attribute(user, 'created_on_site') == site_domain
|
||||
]
|
||||
return unsynced_users
|
||||
for user in use_read_replica_if_available(users):
|
||||
if UserAttribute.get_user_attribute(user, 'created_on_site') == site_domain:
|
||||
yield user
|
||||
|
||||
def _get_level_of_education_display(self, loe):
|
||||
"""
|
||||
@@ -103,13 +107,18 @@ class Command(BaseCommand):
|
||||
"""
|
||||
Splits user's list into batches
|
||||
Args:
|
||||
users: list of users to be batched
|
||||
users: users (generator) to be batched
|
||||
batch_size: size of batch
|
||||
"""
|
||||
total = len(users)
|
||||
for start in range(0, total, batch_size):
|
||||
end = min(start + batch_size, total)
|
||||
yield (start, end, total, users[start:end])
|
||||
end = 0
|
||||
users_iter = iter(users)
|
||||
while True:
|
||||
users_batch = list(itertools.islice(users_iter, batch_size))
|
||||
if not users_batch:
|
||||
raise StopIteration
|
||||
start = end + 1
|
||||
end += len(users_batch)
|
||||
yield (start, end, users_batch)
|
||||
|
||||
def _escape_json(self, value):
|
||||
"""
|
||||
@@ -123,6 +132,12 @@ class Command(BaseCommand):
|
||||
"""
|
||||
contacts = []
|
||||
for user in users_batch:
|
||||
if not hasattr(user, 'profile'):
|
||||
self.stdout.write(u'skipping user {} due to no profile found'.format(user))
|
||||
continue
|
||||
if not user.profile.meta:
|
||||
self.stdout.write(u'skipping user {} due to no profile meta found'.format(user))
|
||||
continue
|
||||
meta = json.loads(user.profile.meta)
|
||||
contact = {
|
||||
"email": user.email,
|
||||
@@ -166,14 +181,16 @@ class Command(BaseCommand):
|
||||
contacts.append(contact)
|
||||
|
||||
api_key = site_conf.get_value('HUBSPOT_API_KEY')
|
||||
client = EdxRestApiClient('/'.join([HUBSPOT_API_BASE_URL, 'contacts/v1/contact']))
|
||||
client = EdxRestApiClient(urlparse.urljoin(HUBSPOT_API_BASE_URL, 'contacts/v1/contact'))
|
||||
try:
|
||||
client.batch.post(contacts, hapikey=api_key)
|
||||
return len(contacts)
|
||||
except (HttpClientError, HttpServerError) as ex:
|
||||
message = "An error occurred while syncing batch of contacts for site {domain}, {message}".format(
|
||||
message = u'An error occurred while syncing batch of contacts for site {domain}, {message}'.format(
|
||||
domain=site_conf.site.domain, message=ex.message
|
||||
)
|
||||
self.stderr.write(message)
|
||||
return 0
|
||||
|
||||
def add_arguments(self, parser):
|
||||
"""
|
||||
@@ -202,14 +219,21 @@ class Command(BaseCommand):
|
||||
batch_size = options['batch_size']
|
||||
try:
|
||||
hubspot_sites = self._get_hubspot_enabled_sites()
|
||||
|
||||
if not hubspot_sites:
|
||||
self.stdout.write(u'No hubspot enabled site found.')
|
||||
return
|
||||
|
||||
for site_conf in hubspot_sites:
|
||||
successfully_synced_contacts = 0
|
||||
site_domain = site_conf.site.domain
|
||||
self.stdout.write(u'Syncing process started for site {site}'.format(site=site_domain))
|
||||
last_synced_user = None
|
||||
# get recently created contact to set a starting point for sync
|
||||
last_synced_contact_email = self._get_last_synced_contact_email(site_conf)
|
||||
if last_synced_contact_email:
|
||||
self.stdout.write(
|
||||
'Last synced email: {email} for site {site}'.format(
|
||||
u'Last synced email: {email} for site {site}'.format(
|
||||
email=last_synced_contact_email, site=site_domain
|
||||
)
|
||||
)
|
||||
@@ -218,26 +242,36 @@ class Command(BaseCommand):
|
||||
last_synced_user = User.objects.filter(email=last_synced_contact_email).first()
|
||||
if not last_synced_user:
|
||||
self.stdout.write(
|
||||
'Failed to get user for last synced email {email} for site {site}'.format(
|
||||
u'Failed to get user for last synced email {email} for site {site}'.format(
|
||||
email=last_synced_contact_email, site=site_domain
|
||||
)
|
||||
)
|
||||
else:
|
||||
self.stdout.write(
|
||||
u'Last synced email: NOT FOUND for site {site}'.format(site=site_domain)
|
||||
)
|
||||
|
||||
site_unsynced_users = self._get_unsynced_users(site_domain, last_synced_user, initial_sync_days)
|
||||
|
||||
for start, end, total, users_batch in self._get_batched_users(site_unsynced_users, batch_size):
|
||||
for start, end, users_batch in self._get_batched_users(site_unsynced_users, batch_size):
|
||||
self.stdout.write(
|
||||
'Syncing users batch from {start} to {end} of {total} unsynced contacts for site {site}'.format(
|
||||
start=start, end=end, total=total, site=site_domain
|
||||
u'Syncing users batch from {start} to {end} unsynced contacts for site {site}'.format(
|
||||
start=start, end=end, site=site_domain
|
||||
)
|
||||
)
|
||||
self._sync_with_hubspot(users_batch, site_conf)
|
||||
successfully_synced_contacts += self._sync_with_hubspot(users_batch, site_conf)
|
||||
self.stdout.write(
|
||||
'Successfully synced users batch from {start} to {end} of {total} for site {site}'.format(
|
||||
start=start, end=end, total=total, site=site_domain
|
||||
u'Successfully synced users batch from {start} to {end} for site {site}'.format(
|
||||
start=start, end=end, site=site_domain
|
||||
)
|
||||
)
|
||||
|
||||
self.stdout.write(
|
||||
u'{count} contacts found and sycned for site {site}'.format(
|
||||
count=successfully_synced_contacts, site=site_domain
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as ex:
|
||||
traceback.print_exc()
|
||||
raise CommandError('Command failed with traceback %s' % str(ex))
|
||||
raise CommandError(u'Command failed with traceback %s' % str(ex))
|
||||
|
||||
Reference in New Issue
Block a user