From 0ea3080ed0b9e0100ee5a60602e802059aff69b1 Mon Sep 17 00:00:00 2001 From: Hammad Ahmad Waqas Date: Tue, 16 Apr 2019 11:43:16 +0500 Subject: [PATCH] slicing users_queryset and fetching users from database according to initial_sync_days --- .../commands/sync_hubspot_contacts.py | 228 +++++++----------- .../tests/test_sync_hubspot_contacts.py | 65 ++--- 2 files changed, 115 insertions(+), 178 deletions(-) diff --git a/openedx/core/djangoapps/user_api/management/commands/sync_hubspot_contacts.py b/openedx/core/djangoapps/user_api/management/commands/sync_hubspot_contacts.py index 55b2b903dd..78add32e80 100644 --- a/openedx/core/djangoapps/user_api/management/commands/sync_hubspot_contacts.py +++ b/openedx/core/djangoapps/user_api/management/commands/sync_hubspot_contacts.py @@ -3,8 +3,8 @@ Management command to sync platform users with hubspot ./manage.py lms sync_hubspot_contacts ./manage.py lms sync_hubspot_contacts --initial-sync-days=7 --batch-size=20 """ -import itertools import json +import time import traceback import urlparse from datetime import datetime, timedelta @@ -20,7 +20,6 @@ from openedx.core.djangoapps.site_configuration.models import SiteConfiguration from student.models import UserAttribute, UserProfile from util.query import use_read_replica_if_available - HUBSPOT_API_BASE_URL = 'https://api.hubapi.com' @@ -41,90 +40,44 @@ class Command(BaseCommand): ] return hubspot_sites - def _get_last_synced_contact_email(self, site_conf): + def _get_users_queryset(self, initial_days): """ - Returns: last synced contact email for given site - + initial_days: numbers of days to go back from today + :return: users queryset """ - api_key = site_conf.get_value('HUBSPOT_API_KEY') - last_contact_email = None - client = EdxRestApiClient(urlparse.urljoin(HUBSPOT_API_BASE_URL, 'contacts/v1/lists/all/contacts')) - try: - response = client.recent.get(hapikey=api_key, count=1, property='email') - if 'contacts' in response: - for contact in response['contacts']: - last_contact_email = contact.get('properties').get('email').get('value') + start_date = datetime.now().date() - timedelta(initial_days) + end_date = datetime.now().date() - timedelta(1) + self.stdout.write(u'Getting users from {start} to {end}'.format(start=start_date, end=end_date)) + users_qs = User.objects.filter( + date_joined__date__gte=start_date, + date_joined__date__lte=end_date + ).order_by('id') + return use_read_replica_if_available(users_qs) - except (HttpClientError, HttpServerError) as ex: - message = u'An error occurred while getting recent contact for site {domain}, {message}'.format( - domain=site_conf.site.domain, message=ex.message - ) - self.stderr.write(message) - return last_contact_email - - def _get_unsynced_users(self, site_domain, last_synced_user, days_threshold): + def _get_batched_users(self, site_domain, users_queryset, offset, users_query_batch_size): """ Args: site_domain: site where we need unsynced users - last_synced_user: last synced user - days_threshold: number of days threshold to sync users in case we don't have last synced user + users_queryset: users_queryset to slice + users_query_batch_size: slice size - Returns: Ordered list of users needs to be synced + Returns: site users """ - if last_synced_user: - self.stdout.write( - u'Started pulling unsynced contacts for site {site} created after user {user}'.format( - site=site_domain, user=last_synced_user - ) + + self.stdout.write( + u'Fetching Users for site {site} from {start} to {end}'.format( + site=site_domain, start=offset, end=offset + users_query_batch_size ) - users = User.objects.select_related('profile').filter(id__gt=last_synced_user.id).order_by('pk') - else: - # If we don't have last synced user get all users who joined on between today and threshold days ago - start_date = datetime.now().date() - timedelta(days_threshold) - self.stdout.write( - u'Started pulling unsynced contacts for site {site} from {start_date}'.format( - site=site_domain, start_date=start_date - ) - ) - users = User.objects.select_related('profile').filter(date_joined__date__gte=start_date).order_by('pk') + ) + users = users_queryset.select_related('profile')[offset: offset + users_query_batch_size] + site_users = [ + user for user in users + if UserAttribute.get_user_attribute(user, 'created_on_site') == site_domain + ] + self.stdout.write(u'\tSite Users={count}'.format(count=len(site_users))) - for user in use_read_replica_if_available(users): - if UserAttribute.get_user_attribute(user, 'created_on_site') == site_domain: - yield user - - def _get_level_of_education_display(self, loe): - """ - Returns: Descriptive level of education - """ - level_of_education = '' - for _loe in UserProfile.LEVEL_OF_EDUCATION_CHOICES: - if loe == _loe[0]: - level_of_education = _loe[1] - return level_of_education - - def _get_batched_users(self, users, batch_size=100): - """ - Splits user's list into batches - Args: - users: users (generator) to be batched - batch_size: size of batch - """ - end = 0 - users_iter = iter(users) - while True: - users_batch = list(itertools.islice(users_iter, batch_size)) - if not users_batch: - raise StopIteration - start = end + 1 - end += len(users_batch) - yield (start, end, users_batch) - - def _escape_json(self, value): - """ - Escapes js for now. Additional escaping can be done here. - """ - return escapejs(value) + return site_users def _sync_with_hubspot(self, users_batch, site_conf): """ @@ -138,43 +91,46 @@ class Command(BaseCommand): if not user.profile.meta: self.stdout.write(u'skipping user {} due to no profile meta found'.format(user)) continue - meta = json.loads(user.profile.meta) + try: + meta = json.loads(user.profile.meta) + except ValueError: + self.stdout.write(u'skipping user {} due to invalid profile meta found'.format(user)) + continue + contact = { "email": user.email, "properties": [ { "property": "firstname", - "value": self._escape_json(meta.get('first_name', '')) + "value": meta.get('first_name', '') }, { "property": "lastname", - "value": self._escape_json(meta.get('last_name', '')) + "value": meta.get('last_name', '') }, { "property": "company", - "value": self._escape_json(meta.get('company', '')) + "value": meta.get('company', '') }, { "property": "jobtitle", - "value": self._escape_json(meta.get('title', '')) + "value": meta.get('title', '') }, { "property": "state", - "value": self._escape_json(meta.get('state', '')) + "value": meta.get('state', '') }, { "property": "country", - "value": self._escape_json(meta.get('country', '')) + "value": meta.get('country', '') }, { "property": "gender", - "value": self._escape_json(user.profile.gender) + "value": user.profile.get_gender_display() }, { "property": "degree", - "value": self._escape_json( - self._get_level_of_education_display(user.profile.level_of_education) - ) + "value": user.profile.get_level_of_education_display() }, ] } @@ -192,16 +148,55 @@ class Command(BaseCommand): self.stderr.write(message) return 0 + def _sync_site(self, site_conf, users_queryset, users_count, contacts_batch_size): + """ + Syncs a single site + """ + site_domain = site_conf.site.domain + self.stdout.write(u'Syncing process started for site {site}'.format(site=site_domain)) + + offset = 0 + users_queue = [] + users_query_batch_size = 5000 + successfully_synced_contacts = 0 + + while offset < users_count: + is_last_iteration = (offset + users_query_batch_size) >= users_count + self.stdout.write( + u'Syncing users batch from {start} to {end} for site {site}'.format( + start=offset, end=offset + users_query_batch_size, site=site_domain + ) + ) + users_queue += self._get_batched_users(site_domain, users_queryset, offset, users_query_batch_size) + while len(users_queue) >= contacts_batch_size \ + or (is_last_iteration and users_queue): # for last iteration need to empty users_queue + users_batch = users_queue[:contacts_batch_size] + del users_queue[:contacts_batch_size] + successfully_synced_contacts += self._sync_with_hubspot(users_batch, site_conf) + time.sleep(0.1) # to make sure request per second could not exceed by 10 + self.stdout.write( + u'Successfully synced users batch from {start} to {end} for site {site}'.format( + start=offset, end=offset + users_query_batch_size, site=site_domain + ) + ) + offset += users_query_batch_size + + self.stdout.write( + u'{count} contacts found and sycned for site {site}'.format( + count=successfully_synced_contacts, site=site_domain + ) + ) + def add_arguments(self, parser): """ Definition of arguments this command accepts """ parser.add_argument( '--initial-sync-days', - default=7, + default=1, dest='initial_sync_days', type=int, - help='Number of days before today to start initial sync', + help='Number of days before today to start sync', ) parser.add_argument( '--batch-size', @@ -218,59 +213,14 @@ class Command(BaseCommand): initial_sync_days = options['initial_sync_days'] batch_size = options['batch_size'] try: + self.stdout.write(u'Command execution started with options = {}.'.format(options)) hubspot_sites = self._get_hubspot_enabled_sites() - - if not hubspot_sites: - self.stdout.write(u'No hubspot enabled site found.') - return - + self.stdout.write(u'{count} hubspot enabled sites found.'.format(count=len(hubspot_sites))) + users_queryset = self._get_users_queryset(initial_sync_days) + users_count = users_queryset.count() + self.stdout.write(u'Users count={count}'.format(count=users_count)) for site_conf in hubspot_sites: - successfully_synced_contacts = 0 - site_domain = site_conf.site.domain - self.stdout.write(u'Syncing process started for site {site}'.format(site=site_domain)) - last_synced_user = None - # get recently created contact to set a starting point for sync - last_synced_contact_email = self._get_last_synced_contact_email(site_conf) - if last_synced_contact_email: - self.stdout.write( - u'Last synced email: {email} for site {site}'.format( - email=last_synced_contact_email, site=site_domain - ) - ) - - # get last synced contact from mysql database - last_synced_user = User.objects.filter(email=last_synced_contact_email).first() - if not last_synced_user: - self.stdout.write( - u'Failed to get user for last synced email {email} for site {site}'.format( - email=last_synced_contact_email, site=site_domain - ) - ) - else: - self.stdout.write( - u'Last synced email: NOT FOUND for site {site}'.format(site=site_domain) - ) - - site_unsynced_users = self._get_unsynced_users(site_domain, last_synced_user, initial_sync_days) - - for start, end, users_batch in self._get_batched_users(site_unsynced_users, batch_size): - self.stdout.write( - u'Syncing users batch from {start} to {end} unsynced contacts for site {site}'.format( - start=start, end=end, site=site_domain - ) - ) - successfully_synced_contacts += self._sync_with_hubspot(users_batch, site_conf) - self.stdout.write( - u'Successfully synced users batch from {start} to {end} for site {site}'.format( - start=start, end=end, site=site_domain - ) - ) - - self.stdout.write( - u'{count} contacts found and sycned for site {site}'.format( - count=successfully_synced_contacts, site=site_domain - ) - ) + self._sync_site(site_conf, users_queryset, users_count, batch_size) except Exception as ex: traceback.print_exc() diff --git a/openedx/core/djangoapps/user_api/management/tests/test_sync_hubspot_contacts.py b/openedx/core/djangoapps/user_api/management/tests/test_sync_hubspot_contacts.py index caf71f8ad6..e6e32df721 100644 --- a/openedx/core/djangoapps/user_api/management/tests/test_sync_hubspot_contacts.py +++ b/openedx/core/djangoapps/user_api/management/tests/test_sync_hubspot_contacts.py @@ -37,7 +37,7 @@ class TestHubspotSyncCommand(TestCase): @classmethod def _create_users(cls, site_conf): # Create some test users - for i in range(1, 11): + for i in range(1, 20): profile_meta = { "first_name": "First Name{0}".format(i), "last_name": "Last Name{0}".format(i), @@ -58,55 +58,42 @@ class TestHubspotSyncCommand(TestCase): def test_without_any_hubspot_api_key(self): """ - Test no recent contact call is made if hubspot integration is not enabled for any site + Test no _sync_site call is made if hubspot integration is not enabled for any site """ orig_values = self.hubspot_site_config.values self.hubspot_site_config.values = {} self.hubspot_site_config.save() - last_synced_contact_email = patch.object(sync_command, '_get_last_synced_contact_email') - mock_last_synced_contact_email = last_synced_contact_email.start() + sync_site = patch.object(sync_command, '_sync_site') + mock_sync_site = sync_site.start() call_command('sync_hubspot_contacts') - self.assertFalse(mock_last_synced_contact_email.called, "Recent contact API should not be called") - last_synced_contact_email.stop() + self.assertFalse(mock_sync_site.called, "_sync_site should not be called") + sync_site.stop() # put values back self.hubspot_site_config.values = orig_values self.hubspot_site_config.save() - def test_recent_contact_called(self): + def test_with_initial_sync_days(self): """ - Test recent contact API is called + Test with providing initial sync days """ - last_synced_contact_email = patch.object(sync_command, '_get_last_synced_contact_email') - mock_last_synced_contact_email = last_synced_contact_email.start() - mock_last_synced_contact_email.return_value = None - call_command('sync_hubspot_contacts') - self.assertTrue(mock_last_synced_contact_email.called, "Recent contact API should be called") - last_synced_contact_email.stop() + sync_with_hubspot = patch.object(sync_command, '_sync_with_hubspot') + mock_sync_with_hubspot = sync_with_hubspot.start() + out = StringIO() + call_command('sync_hubspot_contacts', '--initial-sync-days=7', '--batch-size=2', stdout=out) + output = out.getvalue() + self.assertIn('Successfully synced users', output) + self.assertEqual(mock_sync_with_hubspot.call_count, 4) # 4 requests of batch (2, 2, 2, 1), total 7 contacts + sync_with_hubspot.stop() - def test_with_no_recent_contact_found(self): + def test_command_without_initial_sync_days(self): """ - Test if no recent contact found it should sync all contacts + Test sync last day """ - with patch.object(sync_command, '_get_last_synced_contact_email', return_value=None): - sync_with_hubspot = patch.object(sync_command, '_sync_with_hubspot') - mock_sync_with_hubspot = sync_with_hubspot.start() - out = StringIO() - call_command('sync_hubspot_contacts', '--initial-sync-days=20', '--batch-size=2', stdout=out) - output = out.getvalue() - self.assertIn('Successfully synced users', output) - self.assertEqual(mock_sync_with_hubspot.call_count, 5) - sync_with_hubspot.stop() - - def test_with_recent_contact_found(self): - """ - Test only not synched contacts are synced - """ - with patch.object(sync_command, '_get_last_synced_contact_email', return_value=self.users[3].email): - sync_with_hubspot = patch.object(sync_command, '_sync_with_hubspot') - mock_sync_with_hubspot = sync_with_hubspot.start() - out = StringIO() - call_command('sync_hubspot_contacts', '--batch-size=3', stdout=out) - output = out.getvalue() - self.assertIn('Successfully synced users', output) - self.assertEqual(mock_sync_with_hubspot.call_count, 2) - sync_with_hubspot.stop() + sync_with_hubspot = patch.object(sync_command, '_sync_with_hubspot') + mock_sync_with_hubspot = sync_with_hubspot.start() + out = StringIO() + call_command('sync_hubspot_contacts', '--batch-size=3', stdout=out) + output = out.getvalue() + self.assertIn('Successfully synced users', output) + self.assertEqual(mock_sync_with_hubspot.call_count, 1) + sync_with_hubspot.stop()