slicing users_queryset and fetching users from database according to

initial_sync_days
This commit is contained in:
Hammad Ahmad Waqas
2019-04-16 11:43:16 +05:00
parent 6623553649
commit 0ea3080ed0
2 changed files with 115 additions and 178 deletions

View File

@@ -3,8 +3,8 @@ Management command to sync platform users with hubspot
./manage.py lms sync_hubspot_contacts
./manage.py lms sync_hubspot_contacts --initial-sync-days=7 --batch-size=20
"""
import itertools
import json
import time
import traceback
import urlparse
from datetime import datetime, timedelta
@@ -20,7 +20,6 @@ from openedx.core.djangoapps.site_configuration.models import SiteConfiguration
from student.models import UserAttribute, UserProfile
from util.query import use_read_replica_if_available
HUBSPOT_API_BASE_URL = 'https://api.hubapi.com'
@@ -41,90 +40,44 @@ class Command(BaseCommand):
]
return hubspot_sites
def _get_last_synced_contact_email(self, site_conf):
def _get_users_queryset(self, initial_days):
"""
Returns: last synced contact email for given site
initial_days: numbers of days to go back from today
:return: users queryset
"""
api_key = site_conf.get_value('HUBSPOT_API_KEY')
last_contact_email = None
client = EdxRestApiClient(urlparse.urljoin(HUBSPOT_API_BASE_URL, 'contacts/v1/lists/all/contacts'))
try:
response = client.recent.get(hapikey=api_key, count=1, property='email')
if 'contacts' in response:
for contact in response['contacts']:
last_contact_email = contact.get('properties').get('email').get('value')
start_date = datetime.now().date() - timedelta(initial_days)
end_date = datetime.now().date() - timedelta(1)
self.stdout.write(u'Getting users from {start} to {end}'.format(start=start_date, end=end_date))
users_qs = User.objects.filter(
date_joined__date__gte=start_date,
date_joined__date__lte=end_date
).order_by('id')
return use_read_replica_if_available(users_qs)
except (HttpClientError, HttpServerError) as ex:
message = u'An error occurred while getting recent contact for site {domain}, {message}'.format(
domain=site_conf.site.domain, message=ex.message
)
self.stderr.write(message)
return last_contact_email
def _get_unsynced_users(self, site_domain, last_synced_user, days_threshold):
def _get_batched_users(self, site_domain, users_queryset, offset, users_query_batch_size):
"""
Args:
site_domain: site where we need unsynced users
last_synced_user: last synced user
days_threshold: number of days threshold to sync users in case we don't have last synced user
users_queryset: users_queryset to slice
users_query_batch_size: slice size
Returns: Ordered list of users needs to be synced
Returns: site users
"""
if last_synced_user:
self.stdout.write(
u'Started pulling unsynced contacts for site {site} created after user {user}'.format(
site=site_domain, user=last_synced_user
)
self.stdout.write(
u'Fetching Users for site {site} from {start} to {end}'.format(
site=site_domain, start=offset, end=offset + users_query_batch_size
)
users = User.objects.select_related('profile').filter(id__gt=last_synced_user.id).order_by('pk')
else:
# If we don't have last synced user get all users who joined on between today and threshold days ago
start_date = datetime.now().date() - timedelta(days_threshold)
self.stdout.write(
u'Started pulling unsynced contacts for site {site} from {start_date}'.format(
site=site_domain, start_date=start_date
)
)
users = User.objects.select_related('profile').filter(date_joined__date__gte=start_date).order_by('pk')
)
users = users_queryset.select_related('profile')[offset: offset + users_query_batch_size]
site_users = [
user for user in users
if UserAttribute.get_user_attribute(user, 'created_on_site') == site_domain
]
self.stdout.write(u'\tSite Users={count}'.format(count=len(site_users)))
for user in use_read_replica_if_available(users):
if UserAttribute.get_user_attribute(user, 'created_on_site') == site_domain:
yield user
def _get_level_of_education_display(self, loe):
"""
Returns: Descriptive level of education
"""
level_of_education = ''
for _loe in UserProfile.LEVEL_OF_EDUCATION_CHOICES:
if loe == _loe[0]:
level_of_education = _loe[1]
return level_of_education
def _get_batched_users(self, users, batch_size=100):
"""
Splits user's list into batches
Args:
users: users (generator) to be batched
batch_size: size of batch
"""
end = 0
users_iter = iter(users)
while True:
users_batch = list(itertools.islice(users_iter, batch_size))
if not users_batch:
raise StopIteration
start = end + 1
end += len(users_batch)
yield (start, end, users_batch)
def _escape_json(self, value):
"""
Escapes js for now. Additional escaping can be done here.
"""
return escapejs(value)
return site_users
def _sync_with_hubspot(self, users_batch, site_conf):
"""
@@ -138,43 +91,46 @@ class Command(BaseCommand):
if not user.profile.meta:
self.stdout.write(u'skipping user {} due to no profile meta found'.format(user))
continue
meta = json.loads(user.profile.meta)
try:
meta = json.loads(user.profile.meta)
except ValueError:
self.stdout.write(u'skipping user {} due to invalid profile meta found'.format(user))
continue
contact = {
"email": user.email,
"properties": [
{
"property": "firstname",
"value": self._escape_json(meta.get('first_name', ''))
"value": meta.get('first_name', '')
},
{
"property": "lastname",
"value": self._escape_json(meta.get('last_name', ''))
"value": meta.get('last_name', '')
},
{
"property": "company",
"value": self._escape_json(meta.get('company', ''))
"value": meta.get('company', '')
},
{
"property": "jobtitle",
"value": self._escape_json(meta.get('title', ''))
"value": meta.get('title', '')
},
{
"property": "state",
"value": self._escape_json(meta.get('state', ''))
"value": meta.get('state', '')
},
{
"property": "country",
"value": self._escape_json(meta.get('country', ''))
"value": meta.get('country', '')
},
{
"property": "gender",
"value": self._escape_json(user.profile.gender)
"value": user.profile.get_gender_display()
},
{
"property": "degree",
"value": self._escape_json(
self._get_level_of_education_display(user.profile.level_of_education)
)
"value": user.profile.get_level_of_education_display()
},
]
}
@@ -192,16 +148,55 @@ class Command(BaseCommand):
self.stderr.write(message)
return 0
def _sync_site(self, site_conf, users_queryset, users_count, contacts_batch_size):
"""
Syncs a single site
"""
site_domain = site_conf.site.domain
self.stdout.write(u'Syncing process started for site {site}'.format(site=site_domain))
offset = 0
users_queue = []
users_query_batch_size = 5000
successfully_synced_contacts = 0
while offset < users_count:
is_last_iteration = (offset + users_query_batch_size) >= users_count
self.stdout.write(
u'Syncing users batch from {start} to {end} for site {site}'.format(
start=offset, end=offset + users_query_batch_size, site=site_domain
)
)
users_queue += self._get_batched_users(site_domain, users_queryset, offset, users_query_batch_size)
while len(users_queue) >= contacts_batch_size \
or (is_last_iteration and users_queue): # for last iteration need to empty users_queue
users_batch = users_queue[:contacts_batch_size]
del users_queue[:contacts_batch_size]
successfully_synced_contacts += self._sync_with_hubspot(users_batch, site_conf)
time.sleep(0.1) # to make sure request per second could not exceed by 10
self.stdout.write(
u'Successfully synced users batch from {start} to {end} for site {site}'.format(
start=offset, end=offset + users_query_batch_size, site=site_domain
)
)
offset += users_query_batch_size
self.stdout.write(
u'{count} contacts found and sycned for site {site}'.format(
count=successfully_synced_contacts, site=site_domain
)
)
def add_arguments(self, parser):
"""
Definition of arguments this command accepts
"""
parser.add_argument(
'--initial-sync-days',
default=7,
default=1,
dest='initial_sync_days',
type=int,
help='Number of days before today to start initial sync',
help='Number of days before today to start sync',
)
parser.add_argument(
'--batch-size',
@@ -218,59 +213,14 @@ class Command(BaseCommand):
initial_sync_days = options['initial_sync_days']
batch_size = options['batch_size']
try:
self.stdout.write(u'Command execution started with options = {}.'.format(options))
hubspot_sites = self._get_hubspot_enabled_sites()
if not hubspot_sites:
self.stdout.write(u'No hubspot enabled site found.')
return
self.stdout.write(u'{count} hubspot enabled sites found.'.format(count=len(hubspot_sites)))
users_queryset = self._get_users_queryset(initial_sync_days)
users_count = users_queryset.count()
self.stdout.write(u'Users count={count}'.format(count=users_count))
for site_conf in hubspot_sites:
successfully_synced_contacts = 0
site_domain = site_conf.site.domain
self.stdout.write(u'Syncing process started for site {site}'.format(site=site_domain))
last_synced_user = None
# get recently created contact to set a starting point for sync
last_synced_contact_email = self._get_last_synced_contact_email(site_conf)
if last_synced_contact_email:
self.stdout.write(
u'Last synced email: {email} for site {site}'.format(
email=last_synced_contact_email, site=site_domain
)
)
# get last synced contact from mysql database
last_synced_user = User.objects.filter(email=last_synced_contact_email).first()
if not last_synced_user:
self.stdout.write(
u'Failed to get user for last synced email {email} for site {site}'.format(
email=last_synced_contact_email, site=site_domain
)
)
else:
self.stdout.write(
u'Last synced email: NOT FOUND for site {site}'.format(site=site_domain)
)
site_unsynced_users = self._get_unsynced_users(site_domain, last_synced_user, initial_sync_days)
for start, end, users_batch in self._get_batched_users(site_unsynced_users, batch_size):
self.stdout.write(
u'Syncing users batch from {start} to {end} unsynced contacts for site {site}'.format(
start=start, end=end, site=site_domain
)
)
successfully_synced_contacts += self._sync_with_hubspot(users_batch, site_conf)
self.stdout.write(
u'Successfully synced users batch from {start} to {end} for site {site}'.format(
start=start, end=end, site=site_domain
)
)
self.stdout.write(
u'{count} contacts found and sycned for site {site}'.format(
count=successfully_synced_contacts, site=site_domain
)
)
self._sync_site(site_conf, users_queryset, users_count, batch_size)
except Exception as ex:
traceback.print_exc()

View File

@@ -37,7 +37,7 @@ class TestHubspotSyncCommand(TestCase):
@classmethod
def _create_users(cls, site_conf):
# Create some test users
for i in range(1, 11):
for i in range(1, 20):
profile_meta = {
"first_name": "First Name{0}".format(i),
"last_name": "Last Name{0}".format(i),
@@ -58,55 +58,42 @@ class TestHubspotSyncCommand(TestCase):
def test_without_any_hubspot_api_key(self):
"""
Test no recent contact call is made if hubspot integration is not enabled for any site
Test no _sync_site call is made if hubspot integration is not enabled for any site
"""
orig_values = self.hubspot_site_config.values
self.hubspot_site_config.values = {}
self.hubspot_site_config.save()
last_synced_contact_email = patch.object(sync_command, '_get_last_synced_contact_email')
mock_last_synced_contact_email = last_synced_contact_email.start()
sync_site = patch.object(sync_command, '_sync_site')
mock_sync_site = sync_site.start()
call_command('sync_hubspot_contacts')
self.assertFalse(mock_last_synced_contact_email.called, "Recent contact API should not be called")
last_synced_contact_email.stop()
self.assertFalse(mock_sync_site.called, "_sync_site should not be called")
sync_site.stop()
# put values back
self.hubspot_site_config.values = orig_values
self.hubspot_site_config.save()
def test_recent_contact_called(self):
def test_with_initial_sync_days(self):
"""
Test recent contact API is called
Test with providing initial sync days
"""
last_synced_contact_email = patch.object(sync_command, '_get_last_synced_contact_email')
mock_last_synced_contact_email = last_synced_contact_email.start()
mock_last_synced_contact_email.return_value = None
call_command('sync_hubspot_contacts')
self.assertTrue(mock_last_synced_contact_email.called, "Recent contact API should be called")
last_synced_contact_email.stop()
sync_with_hubspot = patch.object(sync_command, '_sync_with_hubspot')
mock_sync_with_hubspot = sync_with_hubspot.start()
out = StringIO()
call_command('sync_hubspot_contacts', '--initial-sync-days=7', '--batch-size=2', stdout=out)
output = out.getvalue()
self.assertIn('Successfully synced users', output)
self.assertEqual(mock_sync_with_hubspot.call_count, 4) # 4 requests of batch (2, 2, 2, 1), total 7 contacts
sync_with_hubspot.stop()
def test_with_no_recent_contact_found(self):
def test_command_without_initial_sync_days(self):
"""
Test if no recent contact found it should sync all contacts
Test sync last day
"""
with patch.object(sync_command, '_get_last_synced_contact_email', return_value=None):
sync_with_hubspot = patch.object(sync_command, '_sync_with_hubspot')
mock_sync_with_hubspot = sync_with_hubspot.start()
out = StringIO()
call_command('sync_hubspot_contacts', '--initial-sync-days=20', '--batch-size=2', stdout=out)
output = out.getvalue()
self.assertIn('Successfully synced users', output)
self.assertEqual(mock_sync_with_hubspot.call_count, 5)
sync_with_hubspot.stop()
def test_with_recent_contact_found(self):
"""
Test only not synched contacts are synced
"""
with patch.object(sync_command, '_get_last_synced_contact_email', return_value=self.users[3].email):
sync_with_hubspot = patch.object(sync_command, '_sync_with_hubspot')
mock_sync_with_hubspot = sync_with_hubspot.start()
out = StringIO()
call_command('sync_hubspot_contacts', '--batch-size=3', stdout=out)
output = out.getvalue()
self.assertIn('Successfully synced users', output)
self.assertEqual(mock_sync_with_hubspot.call_count, 2)
sync_with_hubspot.stop()
sync_with_hubspot = patch.object(sync_command, '_sync_with_hubspot')
mock_sync_with_hubspot = sync_with_hubspot.start()
out = StringIO()
call_command('sync_hubspot_contacts', '--batch-size=3', stdout=out)
output = out.getvalue()
self.assertIn('Successfully synced users', output)
self.assertEqual(mock_sync_with_hubspot.call_count, 1)
sync_with_hubspot.stop()