debug pearson import and update unit tests
This commit is contained in:
committed by
Ashley Penney
parent
d395c4448d
commit
1199b1ecfa
@@ -10,6 +10,7 @@ from optparse import make_option
|
||||
from dogapi import dog_http_api, dog_stats_api
|
||||
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
from django.conf import settings
|
||||
|
||||
from student.models import TestCenterUser, TestCenterRegistration
|
||||
|
||||
@@ -23,6 +24,7 @@ class Command(BaseCommand):
|
||||
and TestCenterRegistration tables with status.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def datadog_error(string, tags):
|
||||
dog_http_api.event("Pearson Import", string, alert_type='error', tags=tags)
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from optparse import make_option
|
||||
|
||||
from django.contrib.auth.models import User
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
|
||||
from student.models import TestCenterUser, TestCenterUserForm
|
||||
|
||||
@@ -161,15 +161,16 @@ class Command(BaseCommand):
|
||||
if form.is_valid():
|
||||
form.update_and_save()
|
||||
else:
|
||||
errorlist = []
|
||||
if (len(form.errors) > 0):
|
||||
print "Field Form errors encountered:"
|
||||
for fielderror in form.errors:
|
||||
print "Field Form Error: %s" % fielderror
|
||||
if (len(form.non_field_errors()) > 0):
|
||||
print "Non-field Form errors encountered:"
|
||||
for nonfielderror in form.non_field_errors:
|
||||
print "Non-field Form Error: %s" % nonfielderror
|
||||
|
||||
errorlist.append("Field Form errors encountered:")
|
||||
for fielderror in form.errors:
|
||||
errorlist.append("Field Form Error: {}".format(fielderror))
|
||||
if (len(form.non_field_errors()) > 0):
|
||||
errorlist.append("Non-field Form errors encountered:")
|
||||
for nonfielderror in form.non_field_errors:
|
||||
errorlist.append("Non-field Form Error: {}".format(nonfielderror))
|
||||
raise CommandError("\n".join(errorlist))
|
||||
else:
|
||||
print "No changes necessary to make to existing user's demographics."
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import os
|
||||
from optparse import make_option
|
||||
from stat import S_ISDIR
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.management.base import BaseCommand, CommandError
|
||||
@@ -26,85 +27,99 @@ class Command(BaseCommand):
|
||||
action='store',
|
||||
dest='mode',
|
||||
default='both',
|
||||
choices=('import', 'export', 'both'),
|
||||
help='mode is import, export, or both'),
|
||||
)
|
||||
|
||||
def handle(self, **options):
|
||||
|
||||
# TODO: this doesn't work. Need to check if it's a property.
|
||||
if not settings.PEARSON:
|
||||
if not hasattr(settings, 'PEARSON'):
|
||||
raise CommandError('No PEARSON entries in auth/env.json.')
|
||||
|
||||
for value in ['LOCAL_IMPORT', 'SFTP_IMPORT', 'LOCAL_EXPORT',
|
||||
'SFTP_EXPORT', 'SFTP_HOSTNAME', 'SFTP_USERNAME', 'SFTP_PASSWORD']:
|
||||
# check settings needed for either import or export:
|
||||
for value in ['SFTP_HOSTNAME', 'SFTP_USERNAME', 'SFTP_PASSWORD', 'S3_BUCKET']:
|
||||
if value not in settings.PEARSON:
|
||||
raise CommandError('No entry in the PEARSON settings'
|
||||
'(env/auth.json) for {0}'.format(value))
|
||||
|
||||
def import_pearson():
|
||||
try:
|
||||
sftp(settings.PEARSON['SFTP_IMPORT'],
|
||||
settings.PEARSON['LOCAL_IMPORT'], options['mode'])
|
||||
s3(settings.PEARSON['LOCAL_IMPORT'],
|
||||
settings.PEARSON['BUCKET'], options['mode'])
|
||||
except Exception as e:
|
||||
dog_http_api.event('Pearson Import failure', str(e))
|
||||
else:
|
||||
for file in os.listdir(settings.PEARSON['LOCAL_IMPORT']):
|
||||
call_command('pearson_import_conf_zip',
|
||||
settings.PEARSON['LOCAL_IMPORT'] + '/' + file)
|
||||
os.remove(file)
|
||||
for value in ['AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY']:
|
||||
if not hasattr(settings, value):
|
||||
raise CommandError('No entry in the AWS settings'
|
||||
'(env/auth.json) for {0}'.format(value))
|
||||
|
||||
# check additional required settings for import and export:
|
||||
if options['mode'] in ('export', 'both'):
|
||||
for value in ['LOCAL_EXPORT','SFTP_EXPORT']:
|
||||
if value not in settings.PEARSON:
|
||||
raise CommandError('No entry in the PEARSON settings'
|
||||
'(env/auth.json) for {0}'.format(value))
|
||||
# make sure that the import directory exists or can be created:
|
||||
source_dir = settings.PEARSON['LOCAL_EXPORT']
|
||||
if not os.path.isdir(source_dir):
|
||||
os.makedirs(source_dir)
|
||||
|
||||
if options['mode'] in ('import', 'both'):
|
||||
for value in ['LOCAL_IMPORT','SFTP_IMPORT']:
|
||||
if value not in settings.PEARSON:
|
||||
raise CommandError('No entry in the PEARSON settings'
|
||||
'(env/auth.json) for {0}'.format(value))
|
||||
# make sure that the import directory exists or can be created:
|
||||
dest_dir = settings.PEARSON['LOCAL_IMPORT']
|
||||
if not os.path.isdir(dest_dir):
|
||||
os.makedirs(dest_dir)
|
||||
|
||||
def export_pearson():
|
||||
call_command('pearson_export_cdd', 'dest_from_settings')
|
||||
call_command('pearson_export_ead', 'dest_from_settings')
|
||||
sftp(settings.PEARSON['LOCAL_EXPORT'],
|
||||
settings.PEARSON['SFTP_EXPORT'], options['mode'])
|
||||
s3(settings.PEARSON['LOCAL_EXPORT'],
|
||||
settings.PEARSON['BUCKET'], options['mode'])
|
||||
|
||||
if options['mode'] == 'export':
|
||||
export_pearson()
|
||||
elif options['mode'] == 'import':
|
||||
import_pearson()
|
||||
else:
|
||||
export_pearson()
|
||||
import_pearson()
|
||||
|
||||
def sftp(files_from, files_to, mode):
|
||||
def sftp(files_from, files_to, mode, deleteAfterCopy=False):
|
||||
with dog_stats_api.timer('pearson.{0}'.format(mode), tags='sftp'):
|
||||
try:
|
||||
t = paramiko.Transport((settings.PEARSON['SFTP_HOSTNAME'], 22))
|
||||
t.connect(username=settings.PEARSON['SFTP_USERNAME'],
|
||||
password=settings.PEARSON['SFTP_PASSWORD'])
|
||||
sftp = paramiko.SFTPClient.from_transport(t)
|
||||
if os.path.isdir(files_from):
|
||||
|
||||
if mode == 'export':
|
||||
try:
|
||||
sftp.chdir(files_to)
|
||||
except IOError:
|
||||
raise CommandError('SFTP destination path does not exist: {}'.format(files_to))
|
||||
for filename in os.listdir(files_from):
|
||||
sftp.put(files_from + '/' + filename,
|
||||
files_to + '/' + filename)
|
||||
sftp.put(files_from + '/' + filename, filename)
|
||||
if deleteAfterCopy:
|
||||
os.remove(os.path.join(files_from, filename))
|
||||
else:
|
||||
for filename in sftp.listdir(files_from):
|
||||
sftp.get(files_from + '/' + filename,
|
||||
files_to + '/' + filename)
|
||||
sftp.remove(files_from + '/' + filename)
|
||||
t.close()
|
||||
try:
|
||||
sftp.chdir(files_from)
|
||||
except IOError:
|
||||
raise CommandError('SFTP source path does not exist: {}'.format(files_from))
|
||||
for filename in sftp.listdir('.'):
|
||||
# skip subdirectories
|
||||
if not S_ISDIR(sftp.stat(filename).st_mode):
|
||||
sftp.get(filename, files_to + '/' + filename)
|
||||
# delete files from sftp server once they are successfully pulled off:
|
||||
if deleteAfterCopy:
|
||||
sftp.remove(filename)
|
||||
except:
|
||||
dog_http_api.event('pearson {0}'.format(mode),
|
||||
'sftp uploading failed',
|
||||
alert_type='error')
|
||||
raise
|
||||
finally:
|
||||
sftp.close()
|
||||
t.close()
|
||||
|
||||
def s3(files_from, bucket, mode):
|
||||
def s3(files_from, bucket, mode, deleteAfterCopy=False):
|
||||
with dog_stats_api.timer('pearson.{0}'.format(mode), tags='s3'):
|
||||
try:
|
||||
for filename in os.listdir(files_from):
|
||||
upload_file_to_s3(bucket, files_from + '/' + filename)
|
||||
upload_file_to_s3(bucket, files_from, filename)
|
||||
if deleteAfterCopy:
|
||||
os.remove(files_from + '/' + filename)
|
||||
except:
|
||||
dog_http_api.event('pearson {0}'.format(mode),
|
||||
's3 archiving failed')
|
||||
raise
|
||||
|
||||
def upload_file_to_s3(bucket, filename):
|
||||
def upload_file_to_s3(bucket, source_dir, filename):
|
||||
"""
|
||||
Upload file to S3
|
||||
"""
|
||||
@@ -114,4 +129,32 @@ class Command(BaseCommand):
|
||||
b = s3.get_bucket(bucket)
|
||||
k = Key(b)
|
||||
k.key = "{filename}".format(filename=filename)
|
||||
k.set_contents_from_filename(filename)
|
||||
k.set_contents_from_filename(os.path.join(source_dir, filename))
|
||||
|
||||
def export_pearson():
|
||||
options = { 'dest-from-settings' : True }
|
||||
call_command('pearson_export_cdd', **options)
|
||||
call_command('pearson_export_ead', **options)
|
||||
mode = 'export'
|
||||
sftp(settings.PEARSON['LOCAL_EXPORT'], settings.PEARSON['SFTP_EXPORT'], mode, deleteAfterCopy = False)
|
||||
s3(settings.PEARSON['LOCAL_EXPORT'], settings.PEARSON['S3_BUCKET'], mode, deleteAfterCopy=True)
|
||||
|
||||
def import_pearson():
|
||||
mode = 'import'
|
||||
try:
|
||||
sftp(settings.PEARSON['SFTP_IMPORT'], settings.PEARSON['LOCAL_IMPORT'], mode, deleteAfterCopy = True)
|
||||
s3(settings.PEARSON['LOCAL_IMPORT'], settings.PEARSON['S3_BUCKET'], mode, deleteAfterCopy=False)
|
||||
except Exception as e:
|
||||
dog_http_api.event('Pearson Import failure', str(e))
|
||||
raise e
|
||||
else:
|
||||
for filename in os.listdir(settings.PEARSON['LOCAL_IMPORT']):
|
||||
filepath = os.path.join(settings.PEARSON['LOCAL_IMPORT'], filename)
|
||||
call_command('pearson_import_conf_zip', filepath)
|
||||
os.remove(filepath)
|
||||
|
||||
# actually do the work!
|
||||
if options['mode'] in ('export', 'both'):
|
||||
export_pearson()
|
||||
if options['mode'] in ('import', 'both'):
|
||||
import_pearson()
|
||||
|
||||
@@ -11,12 +11,12 @@ import sys
|
||||
|
||||
from django.test import TestCase
|
||||
from django.core.management import call_command
|
||||
from nose.plugins.skip import SkipTest
|
||||
|
||||
from student.models import User, TestCenterRegistration, TestCenterUser, get_testcenter_registration
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def create_tc_user(username):
|
||||
user = User.objects.create_user(username, '{}@edx.org'.format(username), 'fakepass')
|
||||
options = {
|
||||
@@ -47,6 +47,48 @@ def create_tc_registration(username, course_id = 'org1/course1/term1', exam_code
|
||||
registrations = get_testcenter_registration(user, course_id, exam_code)
|
||||
return registrations[0]
|
||||
|
||||
def create_multiple_registrations(prefix='test'):
|
||||
username1 = '{}_multiple1'.format(prefix)
|
||||
create_tc_user(username1)
|
||||
create_tc_registration(username1)
|
||||
create_tc_registration(username1, course_id = 'org1/course2/term1')
|
||||
create_tc_registration(username1, exam_code = 'exam2')
|
||||
username2 = '{}_multiple2'.format(prefix)
|
||||
create_tc_user(username2)
|
||||
create_tc_registration(username2)
|
||||
username3 = '{}_multiple3'.format(prefix)
|
||||
create_tc_user(username3)
|
||||
create_tc_registration(username3, course_id = 'org1/course2/term1')
|
||||
username4 = '{}_multiple4'.format(prefix)
|
||||
create_tc_user(username4)
|
||||
create_tc_registration(username4, exam_code = 'exam2')
|
||||
|
||||
def get_command_error_text(*args, **options):
|
||||
stderr_string = None
|
||||
old_stderr = sys.stderr
|
||||
sys.stderr = cStringIO.StringIO()
|
||||
try:
|
||||
call_command(*args, **options)
|
||||
except SystemExit, why1:
|
||||
# The goal here is to catch CommandError calls.
|
||||
# But these are actually translated into nice messages,
|
||||
# and sys.exit(1) is then called. For testing, we
|
||||
# want to catch what sys.exit throws, and get the
|
||||
# relevant text either from stdout or stderr.
|
||||
if (why1.message > 0):
|
||||
stderr_string = sys.stderr.getvalue()
|
||||
else:
|
||||
raise why1
|
||||
except Exception, why:
|
||||
raise why
|
||||
|
||||
finally:
|
||||
sys.stderr = old_stderr
|
||||
|
||||
if stderr_string is None:
|
||||
raise Exception("Expected call to {} to fail, but it succeeded!".format(args[0]))
|
||||
return stderr_string
|
||||
|
||||
def get_error_string_for_management_call(*args, **options):
|
||||
stdout_string = None
|
||||
old_stdout = sys.stdout
|
||||
@@ -55,17 +97,17 @@ def get_error_string_for_management_call(*args, **options):
|
||||
sys.stderr = cStringIO.StringIO()
|
||||
try:
|
||||
call_command(*args, **options)
|
||||
except BaseException, why1:
|
||||
except SystemExit, why1:
|
||||
# The goal here is to catch CommandError calls.
|
||||
# But these are actually translated into nice messages,
|
||||
# and sys.exit(1) is then called. For testing, we
|
||||
# want to catch what sys.exit throws, and get the
|
||||
# relevant text either from stdout or stderr.
|
||||
# TODO: this should really check to see that we
|
||||
# arrived here because of a sys.exit(1). Otherwise
|
||||
# we should just raise the exception.
|
||||
stdout_string = sys.stdout.getvalue()
|
||||
stderr_string = sys.stderr.getvalue()
|
||||
if (why1.message == 1):
|
||||
stdout_string = sys.stdout.getvalue()
|
||||
stderr_string = sys.stderr.getvalue()
|
||||
else:
|
||||
raise why1
|
||||
except Exception, why:
|
||||
raise why
|
||||
|
||||
@@ -111,6 +153,12 @@ class PearsonTestCase(TestCase):
|
||||
# clean up after any test data was dumped to temp directory
|
||||
delete_temp_dir(self.import_dir)
|
||||
delete_temp_dir(self.export_dir)
|
||||
|
||||
# and clean up the database:
|
||||
# TestCenterUser.objects.all().delete()
|
||||
# TestCenterRegistration.objects.all().delete()
|
||||
|
||||
class PearsonCommandTestCase(PearsonTestCase):
|
||||
|
||||
def test_missing_demographic_fields(self):
|
||||
# We won't bother to test all details of form validation here.
|
||||
@@ -119,16 +167,16 @@ class PearsonTestCase(TestCase):
|
||||
username = 'baduser'
|
||||
User.objects.create_user(username, '{}@edx.org'.format(username), 'fakepass')
|
||||
options = {}
|
||||
output_string, _ = get_error_string_for_management_call('pearson_make_tc_user', username, **options)
|
||||
self.assertTrue(output_string.find('Field Form errors encountered:') >= 0)
|
||||
self.assertTrue(output_string.find('Field Form Error: city') >= 0)
|
||||
self.assertTrue(output_string.find('Field Form Error: first_name') >= 0)
|
||||
self.assertTrue(output_string.find('Field Form Error: last_name') >= 0)
|
||||
self.assertTrue(output_string.find('Field Form Error: country') >= 0)
|
||||
self.assertTrue(output_string.find('Field Form Error: phone_country_code') >= 0)
|
||||
self.assertTrue(output_string.find('Field Form Error: phone') >= 0)
|
||||
self.assertTrue(output_string.find('Field Form Error: address_1') >= 0)
|
||||
self.assertErrorContains(output_string, 'Field Form Error: address_1')
|
||||
error_string = get_command_error_text('pearson_make_tc_user', username, **options)
|
||||
self.assertTrue(error_string.find('Field Form errors encountered:') >= 0)
|
||||
self.assertTrue(error_string.find('Field Form Error: city') >= 0)
|
||||
self.assertTrue(error_string.find('Field Form Error: first_name') >= 0)
|
||||
self.assertTrue(error_string.find('Field Form Error: last_name') >= 0)
|
||||
self.assertTrue(error_string.find('Field Form Error: country') >= 0)
|
||||
self.assertTrue(error_string.find('Field Form Error: phone_country_code') >= 0)
|
||||
self.assertTrue(error_string.find('Field Form Error: phone') >= 0)
|
||||
self.assertTrue(error_string.find('Field Form Error: address_1') >= 0)
|
||||
self.assertErrorContains(error_string, 'Field Form Error: address_1')
|
||||
|
||||
def test_create_good_testcenter_user(self):
|
||||
testcenter_user = create_tc_user("test1")
|
||||
@@ -141,11 +189,11 @@ class PearsonTestCase(TestCase):
|
||||
self.assertIsNotNone(registration)
|
||||
|
||||
def test_cdd_missing_option(self):
|
||||
_, error_string = get_error_string_for_management_call('pearson_export_cdd', **{})
|
||||
error_string = get_command_error_text('pearson_export_cdd', **{})
|
||||
self.assertErrorContains(error_string, 'Error: --destination or --dest-from-settings must be used')
|
||||
|
||||
def test_ead_missing_option(self):
|
||||
_, error_string = get_error_string_for_management_call('pearson_export_ead', **{})
|
||||
error_string = get_command_error_text('pearson_export_ead', **{})
|
||||
self.assertErrorContains(error_string, 'Error: --destination or --dest-from-settings must be used')
|
||||
|
||||
def test_export_single_cdd(self):
|
||||
@@ -211,21 +259,7 @@ class PearsonTestCase(TestCase):
|
||||
os.remove(filepath)
|
||||
|
||||
def test_export_multiple(self):
|
||||
username1 = 'test_multiple1'
|
||||
create_tc_user(username1)
|
||||
create_tc_registration(username1)
|
||||
create_tc_registration(username1, course_id = 'org1/course2/term1')
|
||||
create_tc_registration(username1, exam_code = 'exam2')
|
||||
username2 = 'test_multiple2'
|
||||
create_tc_user(username2)
|
||||
create_tc_registration(username2)
|
||||
username3 = 'test_multiple3'
|
||||
create_tc_user(username3)
|
||||
create_tc_registration(username3, course_id = 'org1/course2/term1')
|
||||
username4 = 'test_multiple4'
|
||||
create_tc_user(username4)
|
||||
create_tc_registration(username4, exam_code = 'exam2')
|
||||
|
||||
create_multiple_registrations("export")
|
||||
with self.settings(PEARSON={ 'LOCAL_EXPORT' : self.export_dir }):
|
||||
options = { 'dest-from-settings' : True }
|
||||
call_command('pearson_export_cdd', **options)
|
||||
@@ -239,3 +273,110 @@ class PearsonTestCase(TestCase):
|
||||
os.remove(filepath)
|
||||
|
||||
|
||||
# def test_bad_demographic_option(self):
|
||||
# username = 'nonuser'
|
||||
# output_string, stderrmsg = get_error_string_for_management_call('pearson_make_tc_user', username, **{'--garbage' : None })
|
||||
# print stderrmsg
|
||||
# self.assertErrorContains(stderrmsg, 'Unexpected option')
|
||||
#
|
||||
# def test_missing_demographic_user(self):
|
||||
# username = 'nonuser'
|
||||
# output_string, error_string = get_error_string_for_management_call('pearson_make_tc_user', username, **{})
|
||||
# self.assertErrorContains(error_string, 'User matching query does not exist')
|
||||
|
||||
# credentials for a test SFTP site:
|
||||
SFTP_HOSTNAME = 'ec2-23-20-150-101.compute-1.amazonaws.com'
|
||||
SFTP_USERNAME = 'pearsontest'
|
||||
SFTP_PASSWORD = 'password goes here'
|
||||
|
||||
S3_BUCKET = 'edx-pearson-archive'
|
||||
AWS_ACCESS_KEY_ID = 'put yours here'
|
||||
AWS_SECRET_ACCESS_KEY = 'put yours here'
|
||||
|
||||
class PearsonTransferTestCase(PearsonTestCase):
|
||||
'''
|
||||
Class for tests running Pearson transfers
|
||||
'''
|
||||
|
||||
def test_transfer_config(self):
|
||||
with self.settings(DATADOG_API='FAKE_KEY'):
|
||||
# TODO: why is this failing with the wrong error message?!
|
||||
stderrmsg = get_command_error_text('pearson_transfer', **{'mode' : 'garbage'})
|
||||
self.assertErrorContains(stderrmsg, 'Error: No PEARSON entries')
|
||||
with self.settings(DATADOG_API='FAKE_KEY'):
|
||||
stderrmsg = get_command_error_text('pearson_transfer')
|
||||
self.assertErrorContains(stderrmsg, 'Error: No PEARSON entries')
|
||||
with self.settings(DATADOG_API='FAKE_KEY',
|
||||
PEARSON={'LOCAL_EXPORT' : self.export_dir,
|
||||
'LOCAL_IMPORT' : self.import_dir }):
|
||||
stderrmsg = get_command_error_text('pearson_transfer')
|
||||
self.assertErrorContains(stderrmsg, 'Error: No entry in the PEARSON settings')
|
||||
|
||||
def test_transfer_export_missing_dest_dir(self):
|
||||
raise SkipTest()
|
||||
create_multiple_registrations('export_missing_dest')
|
||||
with self.settings(DATADOG_API='FAKE_KEY',
|
||||
PEARSON={'LOCAL_EXPORT' : self.export_dir,
|
||||
'SFTP_EXPORT' : 'this/does/not/exist',
|
||||
'SFTP_HOSTNAME' : SFTP_HOSTNAME,
|
||||
'SFTP_USERNAME' : SFTP_USERNAME,
|
||||
'SFTP_PASSWORD' : SFTP_PASSWORD,
|
||||
'S3_BUCKET' : S3_BUCKET,
|
||||
},
|
||||
AWS_ACCESS_KEY_ID = AWS_ACCESS_KEY_ID,
|
||||
AWS_SECRET_ACCESS_KEY = AWS_SECRET_ACCESS_KEY):
|
||||
options = { 'mode' : 'export'}
|
||||
stderrmsg = get_command_error_text('pearson_transfer', **options)
|
||||
self.assertErrorContains(stderrmsg, 'Error: SFTP destination path does not exist')
|
||||
|
||||
def test_transfer_export(self):
|
||||
raise SkipTest()
|
||||
create_multiple_registrations("transfer_export")
|
||||
with self.settings(DATADOG_API='FAKE_KEY',
|
||||
PEARSON={'LOCAL_EXPORT' : self.export_dir,
|
||||
'SFTP_EXPORT' : 'results/topvue',
|
||||
'SFTP_HOSTNAME' : SFTP_HOSTNAME,
|
||||
'SFTP_USERNAME' : SFTP_USERNAME,
|
||||
'SFTP_PASSWORD' : SFTP_PASSWORD,
|
||||
'S3_BUCKET' : S3_BUCKET,
|
||||
},
|
||||
AWS_ACCESS_KEY_ID = AWS_ACCESS_KEY_ID,
|
||||
AWS_SECRET_ACCESS_KEY = AWS_SECRET_ACCESS_KEY):
|
||||
options = { 'mode' : 'export'}
|
||||
# call_command('pearson_transfer', **options)
|
||||
# # confirm that the export directory is still empty:
|
||||
# self.assertEqual(len(os.listdir(self.export_dir)), 0, "expected export directory to be empty")
|
||||
|
||||
def test_transfer_import_missing_source_dir(self):
|
||||
raise SkipTest()
|
||||
create_multiple_registrations('import_missing_src')
|
||||
with self.settings(DATADOG_API='FAKE_KEY',
|
||||
PEARSON={'LOCAL_IMPORT' : self.import_dir,
|
||||
'SFTP_IMPORT' : 'this/does/not/exist',
|
||||
'SFTP_HOSTNAME' : SFTP_HOSTNAME,
|
||||
'SFTP_USERNAME' : SFTP_USERNAME,
|
||||
'SFTP_PASSWORD' : SFTP_PASSWORD,
|
||||
'S3_BUCKET' : S3_BUCKET,
|
||||
},
|
||||
AWS_ACCESS_KEY_ID = AWS_ACCESS_KEY_ID,
|
||||
AWS_SECRET_ACCESS_KEY = AWS_SECRET_ACCESS_KEY):
|
||||
options = { 'mode' : 'import'}
|
||||
stderrmsg = get_command_error_text('pearson_transfer', **options)
|
||||
self.assertErrorContains(stderrmsg, 'Error: SFTP source path does not exist')
|
||||
|
||||
def test_transfer_import(self):
|
||||
raise SkipTest()
|
||||
create_multiple_registrations('import_missing_src')
|
||||
with self.settings(DATADOG_API='FAKE_KEY',
|
||||
PEARSON={'LOCAL_IMPORT' : self.import_dir,
|
||||
'SFTP_IMPORT' : 'results',
|
||||
'SFTP_HOSTNAME' : SFTP_HOSTNAME,
|
||||
'SFTP_USERNAME' : SFTP_USERNAME,
|
||||
'SFTP_PASSWORD' : SFTP_PASSWORD,
|
||||
'S3_BUCKET' : S3_BUCKET,
|
||||
},
|
||||
AWS_ACCESS_KEY_ID = AWS_ACCESS_KEY_ID,
|
||||
AWS_SECRET_ACCESS_KEY = AWS_SECRET_ACCESS_KEY):
|
||||
options = { 'mode' : 'import'}
|
||||
call_command('pearson_transfer', **options)
|
||||
self.assertEqual(len(os.listdir(self.import_dir)), 0, "expected import directory to be empty")
|
||||
|
||||
Reference in New Issue
Block a user